You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/04/12 20:25:26 UTC

[hive] branch master updated: HIVE-23180 : Remove unused variables from tez build dag (Mustafa Iman via Ashutosh Chauhan)

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d9ae2f  HIVE-23180 : Remove unused variables from tez build dag (Mustafa Iman via Ashutosh Chauhan)
7d9ae2f is described below

commit 7d9ae2f2c7879663618de9dffb482f731742b36e
Author: Mustafa Iman <mu...@gmail.com>
AuthorDate: Sun Apr 12 13:24:38 2020 -0700

    HIVE-23180 : Remove unused variables from tez build dag (Mustafa Iman via Ashutosh Chauhan)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../apache/hadoop/hive/ql/exec/tez/DagUtils.java   | 81 ++++++++++------------
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java    | 62 ++++++++---------
 .../hive/ql/udf/generic/GenericUDTFGetSplits.java  |  5 +-
 .../hadoop/hive/ql/exec/tez/TestTezTask.java       |  4 +-
 4 files changed, 67 insertions(+), 85 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 9b2dae3..6f5830d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -153,7 +153,6 @@ import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.input.MultiMRInput;
-import org.apache.hadoop.hive.ql.exec.tez.NullMROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
@@ -667,15 +666,14 @@ public class DagUtils {
     }
   }
 
-  private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, FileSystem fs,
-      Path mrScratchDir, Context ctx, VertexType vertexType,
-      Map<String, LocalResource> localResources) throws Exception {
+  private Vertex createVertexFromMergeWork(JobConf conf, MergeJoinWork mergeJoinWork,
+      Path mrScratchDir, VertexType vertexType) throws Exception {
     Utilities.setMergeWork(conf, mergeJoinWork, mrScratchDir, false);
     if (mergeJoinWork.getMainWork() instanceof MapWork) {
       List<BaseWork> mapWorkList = mergeJoinWork.getBaseWorkList();
       MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork());
-      Vertex mergeVx = createVertex(
-          conf, mapWork, fs, mrScratchDir, ctx, vertexType, localResources);
+      Vertex mergeVx = createVertexFromMapWork(
+          conf, mapWork, mrScratchDir, vertexType);
 
       conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
       // mapreduce.tez.input.initializer.serialize.event.payload should be set
@@ -718,17 +716,16 @@ public class DagUtils {
       mergeVx.setVertexManagerPlugin(desc);
       return mergeVx;
     } else {
-      return createVertex(conf,
-          (ReduceWork) mergeJoinWork.getMainWork(), fs, mrScratchDir, ctx, localResources);
+      return createVertexFromReduceWork(conf,
+          (ReduceWork) mergeJoinWork.getMainWork(), mrScratchDir);
     }
   }
 
   /*
    * Helper function to create Vertex from MapWork.
    */
-  private Vertex createVertex(JobConf conf, MapWork mapWork,
-      FileSystem fs, Path mrScratchDir, Context ctx, VertexType vertexType,
-      Map<String, LocalResource> localResources) throws Exception {
+  private Vertex createVertexFromMapWork(JobConf conf, MapWork mapWork, Path mrScratchDir,
+      VertexType vertexType) throws Exception {
 
     // set up the operator plan
     Utilities.cacheMapWork(conf, mapWork, mrScratchDir);
@@ -846,21 +843,16 @@ public class DagUtils {
       procClassName = MergeFileTezProcessor.class.getName();
     }
 
-    VertexExecutionContext executionContext = createVertexExecutionContext(mapWork);
-
     map = Vertex.create(mapWork.getName(), ProcessorDescriptor.create(procClassName)
         .setUserPayload(serializedConf), numTasks, getContainerResource(conf));
 
     map.setTaskEnvironment(getContainerEnvironment(conf, true));
-    map.setExecutionContext(executionContext);
-    map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
 
     assert mapWork.getAliasToWork().keySet().size() == 1;
 
     // Add the actual source input
     String alias = mapWork.getAliasToWork().keySet().iterator().next();
     map.addDataSource(alias, dataSource);
-    map.addTaskLocalFiles(localResources);
     return map;
   }
 
@@ -899,8 +891,7 @@ public class DagUtils {
   /*
    * Helper function to create Vertex for given ReduceWork.
    */
-  private Vertex createVertex(JobConf conf, ReduceWork reduceWork, FileSystem fs,
-      Path mrScratchDir, Context ctx, Map<String, LocalResource> localResources)
+  private Vertex createVertexFromReduceWork(JobConf conf, ReduceWork reduceWork, Path mrScratchDir)
           throws Exception {
 
     // set up operator plan
@@ -910,8 +901,6 @@ public class DagUtils {
     // create the directories FileSinkOperators need
     Utilities.createTmpDirs(conf, reduceWork);
 
-    VertexExecutionContext vertexExecutionContext = createVertexExecutionContext(reduceWork);
-
     // create the vertex
     Vertex reducer = Vertex.create(reduceWork.getName(),
         ProcessorDescriptor.create(ReduceTezProcessor.class.getName()).
@@ -921,9 +910,6 @@ public class DagUtils {
             reduceWork.getNumReduceTasks(), getContainerResource(conf));
 
     reducer.setTaskEnvironment(getContainerEnvironment(conf, false));
-    reducer.setExecutionContext(vertexExecutionContext);
-    reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
-    reducer.addTaskLocalFiles(localResources);
     return reducer;
   }
 
@@ -1444,40 +1430,38 @@ public class DagUtils {
    * Create a vertex from a given work object.
    *
    * @param conf JobConf to be used to this execution unit
-   * @param work The instance of BaseWork representing the actual work to be performed
+   * @param workUnit The instance of BaseWork representing the actual work to be performed
    * by this vertex.
    * @param scratchDir HDFS scratch dir for this execution unit.
-   * @param fileSystem FS corresponding to scratchDir and LocalResources
-   * @param ctx This query's context
    * @return Vertex
    */
   @SuppressWarnings("deprecation")
-  public Vertex createVertex(JobConf conf, BaseWork work,
-      Path scratchDir, FileSystem fileSystem, Context ctx, boolean hasChildren,
-      TezWork tezWork, VertexType vertexType, Map<String, LocalResource> localResources) throws Exception {
+  public Vertex createVertex(JobConf conf, BaseWork workUnit, Path scratchDir,
+      TezWork tezWork, Map<String, LocalResource> localResources) throws Exception {
 
-    Vertex v = null;
+    Vertex vertex;
     // simply dispatch the call to the right method for the actual (sub-) type of
     // BaseWork.
-    if (work instanceof MapWork) {
-      v = createVertex(
-          conf, (MapWork) work, fileSystem, scratchDir, ctx, vertexType, localResources);
-    } else if (work instanceof ReduceWork) {
-      v = createVertex(conf, (ReduceWork) work, fileSystem, scratchDir, ctx, localResources);
-    } else if (work instanceof MergeJoinWork) {
-      v = createVertex(
-          conf, (MergeJoinWork) work, fileSystem, scratchDir, ctx, vertexType, localResources);
+    VertexType vertexType = tezWork.getVertexType(workUnit);
+    if (workUnit instanceof MapWork) {
+      vertex = createVertexFromMapWork(
+          conf, (MapWork) workUnit, scratchDir, vertexType);
+    } else if (workUnit instanceof ReduceWork) {
+      vertex = createVertexFromReduceWork(conf, (ReduceWork) workUnit, scratchDir);
+    } else if (workUnit instanceof MergeJoinWork) {
+      vertex = createVertexFromMergeWork(
+          conf, (MergeJoinWork) workUnit, scratchDir, vertexType);
       // set VertexManagerPlugin if whether it's a cross product destination vertex
       List<String> crossProductSources = new ArrayList<>();
-      for (BaseWork parentWork : tezWork.getParents(work)) {
-        if (tezWork.getEdgeType(parentWork, work) == EdgeType.XPROD_EDGE) {
+      for (BaseWork parentWork : tezWork.getParents(workUnit)) {
+        if (tezWork.getEdgeType(parentWork, workUnit) == EdgeType.XPROD_EDGE) {
           crossProductSources.add(parentWork.getName());
         }
       }
 
       if (!crossProductSources.isEmpty()) {
         CartesianProductConfig cpConfig = new CartesianProductConfig(crossProductSources);
-        v.setVertexManagerPlugin(
+        vertex.setVertexManagerPlugin(
           VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName())
             .setUserPayload(cpConfig.toUserPayload(new TezConfiguration(conf))));
         // parallelism shouldn't be set for cartesian product vertex
@@ -1486,14 +1470,18 @@ public class DagUtils {
       // something is seriously wrong if this is happening
       throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg());
     }
+    VertexExecutionContext vertexExecutionContext = createVertexExecutionContext(workUnit);
+    vertex.addTaskLocalFiles(localResources);
+    vertex.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
+    vertex.setExecutionContext(vertexExecutionContext);
 
     // initialize stats publisher if necessary
-    if (work.isGatheringStats()) {
+    if (workUnit.isGatheringStats()) {
       StatsPublisher statsPublisher;
       StatsFactory factory = StatsFactory.newFactory(conf);
       if (factory != null) {
         StatsCollectionContext sCntxt = new StatsCollectionContext(conf);
-        sCntxt.setStatsTmpDirs(Utilities.getStatsTmpDirs(work, conf));
+        sCntxt.setStatsTmpDirs(Utilities.getStatsTmpDirs(workUnit, conf));
         statsPublisher = factory.getStatsPublisher();
         if (!statsPublisher.init(sCntxt)) { // creating stats table if not exists
           if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
@@ -1512,13 +1500,14 @@ public class DagUtils {
       outputKlass = MROutput.class;
     }
     // final vertices need to have at least one output
-    if (!hasChildren) {
-      v.addDataSink("out_"+work.getName(), new DataSinkDescriptor(
+    boolean endVertex = tezWork.getLeaves().contains(workUnit);
+    if (endVertex) {
+      vertex.addDataSink("out_"+workUnit.getName(), new DataSinkDescriptor(
           OutputDescriptor.create(outputKlass.getName())
           .setUserPayload(TezUtils.createUserPayloadFromConf(conf)), null, null));
     }
 
-    return v;
+    return vertex;
   }
 
   /**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 3599d19..854bc89 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -34,7 +34,6 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
@@ -404,17 +403,15 @@ public class TezTask extends Task<TezWork> {
     }
   }
 
-  DAG build(JobConf conf, TezWork work, Path scratchDir, Context ctx,
+  DAG build(JobConf conf, TezWork tezWork, Path scratchDir, Context ctx,
       Map<String, LocalResource> vertexResources) throws Exception {
 
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_BUILD_DAG);
 
     // getAllWork returns a topologically sorted list, which we use to make
     // sure that vertices are created before they are used in edges.
-    List<BaseWork> ws = work.getAllWork();
-    Collections.reverse(ws);
-
-    FileSystem fs = scratchDir.getFileSystem(conf);
+    List<BaseWork> topologicalWorkList = tezWork.getAllWork();
+    Collections.reverse(topologicalWorkList);
 
     // the name of the dag is what is displayed in the AM/Job UI
     String dagName = utils.createDagName(conf, queryPlan);
@@ -435,13 +432,12 @@ public class TezTask extends Task<TezWork> {
     dag.setCredentials(conf.getCredentials());
     setAccessControlsForCurrentUser(dag, queryPlan.getQueryId(), conf);
 
-    for (BaseWork w: ws) {
-      boolean isFinal = work.getLeaves().contains(w);
+    for (BaseWork workUnit: topologicalWorkList) {
 
       // translate work to vertex
-      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
+      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + workUnit.getName());
 
-      if (w instanceof UnionWork) {
+      if (workUnit instanceof UnionWork) {
         // Special case for unions. These items translate to VertexGroups
 
         List<BaseWork> unionWorkItems = new LinkedList<BaseWork>();
@@ -449,8 +445,8 @@ public class TezTask extends Task<TezWork> {
 
         // split the children into vertices that make up the union and vertices that are
         // proper children of the union
-        for (BaseWork v: work.getChildren(w)) {
-          EdgeType type = work.getEdgeProperty(w, v).getEdgeType();
+        for (BaseWork v: tezWork.getChildren(workUnit)) {
+          EdgeType type = tezWork.getEdgeProperty(workUnit, v).getEdgeType();
           if (type == EdgeType.CONTAINS) {
             unionWorkItems.add(v);
           } else {
@@ -458,7 +454,7 @@ public class TezTask extends Task<TezWork> {
           }
         }
         JobConf parentConf = workToConf.get(unionWorkItems.get(0));
-        checkOutputSpec(w, parentConf);
+        checkOutputSpec(workUnit, parentConf);
 
         // create VertexGroup
         Vertex[] vertexArray = new Vertex[unionWorkItems.size()];
@@ -467,7 +463,7 @@ public class TezTask extends Task<TezWork> {
         for (BaseWork v: unionWorkItems) {
           vertexArray[i++] = workToVertex.get(v);
         }
-        VertexGroup group = dag.createVertexGroup(w.getName(), vertexArray);
+        VertexGroup group = dag.createVertexGroup(workUnit.getName(), vertexArray);
 
         // For a vertex group, all Outputs use the same Key-class, Val-class and partitioner.
         // Pick any one source vertex to figure out the Edge configuration.
@@ -476,47 +472,47 @@ public class TezTask extends Task<TezWork> {
         for (BaseWork v: children) {
           // finally we can create the grouped edge
           GroupInputEdge e = utils.createEdge(group, parentConf,
-               workToVertex.get(v), work.getEdgeProperty(w, v), v, work);
+               workToVertex.get(v), tezWork.getEdgeProperty(workUnit, v), v, tezWork);
 
           dag.addEdge(e);
         }
       } else {
         // Regular vertices
-        JobConf wxConf = utils.initializeVertexConf(conf, ctx, w);
-        checkOutputSpec(w, wxConf);
-        Vertex wx = utils.createVertex(wxConf, w, scratchDir, fs, ctx, !isFinal,
-            work, work.getVertexType(w), vertexResources);
-        if (work.getChildren(w).size() > 1) {
-          String value = wxConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB);
+        JobConf wxConf = utils.initializeVertexConf(conf, ctx, workUnit);
+        checkOutputSpec(workUnit, wxConf);
+        Vertex wx = utils.createVertex(wxConf, workUnit, scratchDir,
+            tezWork, vertexResources);
+        if (tezWork.getChildren(workUnit).size() > 1) {
+          String tezRuntimeSortMb = wxConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB);
           int originalValue = 0;
-          if(value == null) {
+          if(tezRuntimeSortMb == null) {
             originalValue = TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB_DEFAULT;
           } else {
-            originalValue = Integer.valueOf(value);
+            originalValue = Integer.valueOf(tezRuntimeSortMb);
           }
-          int newValue = (int) (originalValue / work.getChildren(w).size());
+          int newValue = originalValue / tezWork.getChildren(workUnit).size();
           wxConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, Integer.toString(newValue));
           LOG.info("Modified " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " to " + newValue);
         }
-        if (w.getReservedMemoryMB() > 0) {
+        if (workUnit.getReservedMemoryMB() > 0) {
           // If reversedMemoryMB is set, make memory allocation fraction adjustment as needed
-          double frac = DagUtils.adjustMemoryReserveFraction(w.getReservedMemoryMB(), super.conf);
+          double frac = DagUtils.adjustMemoryReserveFraction(workUnit.getReservedMemoryMB(), super.conf);
           LOG.info("Setting " + TEZ_MEMORY_RESERVE_FRACTION + " to " + frac);
           wx.setConf(TEZ_MEMORY_RESERVE_FRACTION, Double.toString(frac));
         } // Otherwise just leave it up to Tez to decide how much memory to allocate
         dag.addVertex(wx);
-        utils.addCredentials(w, dag);
-        perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
-        workToVertex.put(w, wx);
-        workToConf.put(w, wxConf);
+        utils.addCredentials(workUnit, dag);
+        perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + workUnit.getName());
+        workToVertex.put(workUnit, wx);
+        workToConf.put(workUnit, wxConf);
 
         // add all dependencies (i.e.: edges) to the graph
-        for (BaseWork v: work.getChildren(w)) {
+        for (BaseWork v: tezWork.getChildren(workUnit)) {
           assert workToVertex.containsKey(v);
           Edge e = null;
 
-          TezEdgeProperty edgeProp = work.getEdgeProperty(w, v);
-          e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, v, work);
+          TezEdgeProperty edgeProp = tezWork.getEdgeProperty(workUnit, v);
+          e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, v, tezWork);
           dag.addEdge(e);
         }
       }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index 389f5cc..00a6c89 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -440,7 +440,6 @@ public class GenericUDTFGetSplits extends GenericUDTF {
     JobConf wxConf = utils.initializeVertexConf(job, ctx, mapWork);
     // TODO: should we also whitelist input formats here? from mapred.input.format.class
     Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), job);
-    FileSystem fs = scratchDir.getFileSystem(job);
     try {
       LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(ctx.getConf()), utils, job);
 
@@ -453,8 +452,8 @@ public class GenericUDTFGetSplits extends GenericUDTF {
       // Update the queryId to use the generated applicationId. See comment below about
       // why this is done.
       HiveConf.setVar(wxConf, HiveConf.ConfVars.HIVEQUERYID, applicationId.toString());
-      Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, fs, ctx, false, work,
-          work.getVertexType(mapWork), DagUtils.createTezLrMap(appJarLr, null));
+      Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, work,
+          DagUtils.createTezLrMap(appJarLr, null));
       String vertexName = wx.getName();
       dag.addVertex(wx);
       utils.addCredentials(mapWork, dag);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index c14dc62..6f52d65 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.mapred.JobConf;
@@ -95,8 +94,7 @@ public class TestTezTask {
     when(utils.getTezDir(any(Path.class))).thenReturn(path);
     when(
         utils.createVertex(any(JobConf.class), any(BaseWork.class), any(Path.class),
-            any(FileSystem.class), any(Context.class),
-            anyBoolean(), any(TezWork.class), any(VertexType.class), any(Map.class))).thenAnswer(
+            any(TezWork.class), any(Map.class))).thenAnswer(
         new Answer<Vertex>() {
 
           @Override