You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2014/08/08 01:23:24 UTC

svn commit: r1616627 - in /hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez: DagUtils.java ObjectCache.java tools/InputMerger.java

Author: vikram
Date: Thu Aug  7 23:23:23 2014
New Revision: 1616627

URL: http://svn.apache.org/r1616627
Log:
HIVE-7639: Bring tez-branch upto api changes in TEZ-1379, TEZ-1057, TEZ-1382 (Gopal V via Vikram Dixit)

Modified:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1616627&r1=1616626&r2=1616627&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Thu Aug  7 23:23:23 2014
@@ -81,6 +81,8 @@ import org.apache.hadoop.yarn.util.Conve
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.tez.client.PreWarmContext;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -302,7 +304,7 @@ public class DagUtils {
     String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
     String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
     String partitionerClassName = conf.get("mapred.partitioner.class");
-    Configuration partitionerConf;
+    Map<String, String> partitionerConf;
 
     EdgeType edgeType = edgeProp.getEdgeType();
     switch (edgeType) {
@@ -352,12 +354,12 @@ public class DagUtils {
    *          a base configuration to extract relevant properties
    * @return
    */
-  private Configuration createPartitionerConf(String partitionerClassName,
+  private Map<String, String> createPartitionerConf(String partitionerClassName,
       Configuration baseConf) {
-    Configuration partitionerConf = new Configuration(false);
-    partitionerConf.set("mapred.partitioner.class", partitionerClassName);
+    Map<String, String> partitionerConf = new HashMap<String, String>();
+    partitionerConf.put("mapred.partitioner.class", partitionerClassName);
     if (baseConf.get("mapreduce.totalorderpartitioner.path") != null) {
-      partitionerConf.set("mapreduce.totalorderpartitioner.path",
+      partitionerConf.put("mapreduce.totalorderpartitioner.path",
       baseConf.get("mapreduce.totalorderpartitioner.path"));
     }
     return partitionerConf;
@@ -491,8 +493,8 @@ public class DagUtils {
       mrInput = MRHelpers.createMRInputPayload(serializedConf, null);
     }
     map.addDataSource(alias,
-        new InputDescriptor(MRInputLegacy.class.getName()).
-        setUserPayload(mrInput), new InputInitializerDescriptor(amSplitGeneratorClass.getName()).setUserPayload(mrInput));
+        new DataSourceDescriptor(new InputDescriptor(MRInputLegacy.class.getName()).
+        setUserPayload(mrInput), new InputInitializerDescriptor(amSplitGeneratorClass.getName()).setUserPayload(mrInput),null));
 
     Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
     localResources.put(getBaseName(appJarLr), appJarLr);
@@ -946,9 +948,9 @@ public class DagUtils {
 
     // final vertices need to have at least one output
     if (!hasChildren) {
-      v.addDataSink("out_"+work.getName(),
+      v.addDataSink("out_"+work.getName(), new DataSinkDescriptor(
           new OutputDescriptor(MROutput.class.getName())
-          .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), null);
+          .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), null, null));
     }
 
     return v;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java?rev=1616627&r1=1616626&r2=1616627&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java Thu Aug  7 23:23:23 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec.t
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
 
@@ -36,7 +35,7 @@ public class ObjectCache implements org.
   @Override
   public void cache(String key, Object value) {
     LOG.info("Adding " + key + " to cache with value " + value);
-    registry.add(ObjectLifeCycle.VERTEX, key, value);
+    registry.cacheForVertex(key, value);
   }
 
   @Override

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java?rev=1616627&r1=1616626&r2=1616627&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java Thu Aug  7 23:23:23 2014
@@ -37,7 +37,7 @@ import org.apache.tez.runtime.library.ap
  * Uses a priority queue to pick the KeyValuesReader of the input that is next in
  * sort order.
  */
-public class InputMerger implements KeyValuesReader {
+public class InputMerger extends KeyValuesReader {
 
   public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class);
   private PriorityQueue<KeyValuesReader> pQueue = null;