You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2014/08/09 01:28:22 UTC

svn commit: r1616905 - in /hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez: CustomPartitionEdge.java DagUtils.java HiveSplitGenerator.java TezSessionState.java

Author: vikram
Date: Fri Aug  8 23:28:21 2014
New Revision: 1616905

URL: http://svn.apache.org/r1616905
Log:
HIVE-7656: Bring tez-branch up-to the API changes made by TEZ-1372 (Gopal V via Vikram Dixit)

Modified:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java?rev=1616905&r1=1616904&r2=1616905&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java Fri Aug  8 23:28:21 2014
@@ -43,7 +43,6 @@ public class CustomPartitionEdge extends
   // used by the framework at runtime. initialize is the real initializer at runtime
   public CustomPartitionEdge(EdgeManagerContext context) {
     super(context);
-    this.context = context;
   }
 
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1616905&r1=1616904&r2=1616905&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Fri Aug  8 23:28:21 2014
@@ -79,7 +79,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.client.PreWarmVertex;
+import org.apache.tez.client.PreWarmContext;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.DataSourceDescriptor;
@@ -380,15 +380,6 @@ public class DagUtils {
   }
 
   /*
-   * Helper to setup default environment for a task in YARN.
-   */
-  private Map<String, String> getContainerEnvironment(Configuration conf, boolean isMap) {
-    Map<String, String> environment = new HashMap<String, String>();
-    MRHelpers.updateEnvironmentForMRTasks(conf, environment, isMap);
-    return environment;
-  }
-
-  /*
    * Helper to determine what java options to use for the containers
    * Falls back to Map-reduces map java opts if no tez specific options
    * are set
@@ -458,7 +449,7 @@ public class DagUtils {
       // is HiveInputFormat
       if (inputFormatClass == HiveInputFormat.class) {
         useTezGroupedSplits = true;
-        conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
+        conf.setClass("mapred.input.format.class", TezGroupedSplitsInputFormat.class, InputFormat.class);
       }
     }
 
@@ -485,7 +476,9 @@ public class DagUtils {
     map = new Vertex(mapWork.getName(),
         new ProcessorDescriptor(MapTezProcessor.class.getName()).
         setUserPayload(serializedConf), numTasks, getContainerResource(conf));
-    map.setTaskEnvironment(getContainerEnvironment(conf, true));
+    Map<String, String> environment = new HashMap<String, String>();
+    MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
+    map.setTaskEnvironment(environment);
     map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
 
     assert mapWork.getAliasToWork().keySet().size() == 1;
@@ -494,9 +487,10 @@ public class DagUtils {
 
     byte[] mrInput = null;
     if (useTezGroupedSplits) {
-      mrInput = MRHelpers.createMRInputPayloadWithGrouping(serializedConf);
+      mrInput = MRHelpers.createMRInputPayloadWithGrouping(serializedConf,
+          HiveInputFormat.class.getName());
     } else {
-      mrInput = MRHelpers.createMRInputPayload(serializedConf);
+      mrInput = MRHelpers.createMRInputPayload(serializedConf, null);
     }
     map.addDataSource(alias,
         new DataSourceDescriptor(new InputDescriptor(MRInputLegacy.class.getName()).
@@ -556,7 +550,11 @@ public class DagUtils {
             reduceWork.isAutoReduceParallelism() ? reduceWork.getMaxReduceTasks() : reduceWork
                 .getNumReduceTasks(), getContainerResource(conf));
 
-    reducer.setTaskEnvironment(getContainerEnvironment(conf, false));
+    Map<String, String> environment = new HashMap<String, String>();
+
+    MRHelpers.updateEnvironmentForMRTasks(conf, environment, false);
+    reducer.setTaskEnvironment(environment);
+
     reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
 
     Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
@@ -600,16 +598,17 @@ public class DagUtils {
   /**
    * @param numContainers number of containers to pre-warm
    * @param localResources additional resources to pre-warm with
-   * @return prewarm vertex to run
+   * @return prewarm context object
    */
-  public PreWarmVertex createPreWarmVertex(TezConfiguration conf,
+  public PreWarmContext createPreWarmContext(TezConfiguration conf,
       int numContainers, Map<String, LocalResource> localResources) throws
       IOException, TezException {
 
     ProcessorDescriptor prewarmProcDescriptor = new ProcessorDescriptor(HivePreWarmProcessor.class.getName());
     prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf));
 
-    PreWarmVertex prewarmVertex = new PreWarmVertex("prewarm", prewarmProcDescriptor, numContainers,getContainerResource(conf));
+    PreWarmContext context = new PreWarmContext(prewarmProcDescriptor, getContainerResource(conf),
+        numContainers, null);
 
     Map<String, LocalResource> combinedResources = new HashMap<String, LocalResource>();
 
@@ -617,10 +616,14 @@ public class DagUtils {
       combinedResources.putAll(localResources);
     }
 
-    prewarmVertex.setTaskLocalFiles(localResources);
-    prewarmVertex.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
-    prewarmVertex.setTaskEnvironment(getContainerEnvironment(conf, false));
-    return prewarmVertex;
+    context.setLocalResources(combinedResources);
+
+    /* boiler plate task env */
+    Map<String, String> environment = new HashMap<String, String>();
+    MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
+    context.setEnvironment(environment);
+    context.setJavaOpts(getContainerJavaOpts(conf));
+    return context;
   }
 
   /**

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1616905&r1=1616904&r2=1616905&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Fri Aug  8 23:28:21 2014
@@ -90,7 +90,7 @@ public class HiveSplitGenerator extends 
     ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
 
     InputSplitInfoMem inputSplitInfo = null;
-    String realInputFormatName = conf.get("mapred.input.format.class");
+    String realInputFormatName = userPayloadProto.getInputFormatName();
     if (realInputFormatName != null && !realInputFormatName.isEmpty()) {
       // Need to instantiate the realInputFormat
       InputFormat<?, ?> inputFormat =
@@ -123,8 +123,7 @@ public class HiveSplitGenerator extends 
       inputSplitInfo =
           new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, null, jobConf);
     } else {
-      // no need for grouping and the target #of tasks.
-      inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf, false, 0);
+      inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf);
     }
 
     return createEventList(sendSerializedEvents, inputSplitInfo);

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1616905&r1=1616904&r2=1616905&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Fri Aug  8 23:28:21 2014
@@ -50,12 +50,11 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.PreWarmContext;
 import org.apache.tez.client.TezClient;
-import org.apache.tez.client.PreWarmVertex;
 import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 
 /**
@@ -171,15 +170,6 @@ public class TezSessionState {
     // generate basic tez config
     TezConfiguration tezConfig = new TezConfiguration(conf);
     tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString());
-
-    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) {
-      int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
-      n = Math.max(tezConfig.getInt(
-          TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS,
-          TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS_DEFAULT), n);
-      tezConfig.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, n);
-    }
-
     session = new TezClient("HIVE-" + sessionId, tezConfig, true,
         commonLocalResources, null);
 
@@ -192,10 +182,10 @@ public class TezSessionState {
       int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
       LOG.info("Prewarming " + n + " containers  (id: " + sessionId
           + ", scratch dir: " + tezScratchDir + ")");
-      PreWarmVertex prewarmVertex = utils.createPreWarmVertex(tezConfig, n,
+      PreWarmContext context = utils.createPreWarmContext(tezConfig, n,
           commonLocalResources);
       try {
-        session.preWarm(prewarmVertex);
+        session.preWarm(context);
       } catch (InterruptedException ie) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Hive Prewarm threw an exception ", ie);