You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/14 06:15:51 UTC

svn commit: r1639569 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: exec/spark/HashTableLoader.java optimizer/physical/SparkMapJoinResolver.java plan/MapredLocalWork.java

Author: xuefu
Date: Fri Nov 14 05:15:49 2014
New Revision: 1639569

URL: http://svn.apache.org/r1639569
Log:
HIVE-8776: Generate MapredLocalWork in SparkMapJoinResolver [Spark Brach] (Chao via Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java?rev=1639569&r1=1639568&r2=1639569&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java Fri Nov 14 05:15:49 2014
@@ -76,7 +76,7 @@ public class HashTableLoader implements 
       }
       // All HashTables share the same base dir,
       // which is passed in as the tmp path
-      Path baseDir = localWork.getTmpHDFSPath();
+      Path baseDir = localWork.getTmpPath();
       if (baseDir == null) {
         return;
       }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java?rev=1639569&r1=1639568&r2=1639569&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java Fri Nov 14 05:15:49 2014
@@ -20,24 +20,31 @@ package org.apache.hadoop.hive.ql.optimi
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 
@@ -120,7 +127,7 @@ public class SparkMapJoinResolver implem
         // update this information in sparkWorkMap
         sparkWorkMap.put(work, parentWork);
         for (BaseWork parent : parentWorks) {
-          if (containsOp(parent, HashTableSinkOperator.class)) {
+          if (containsOp(parent, SparkHashTableSinkOperator.class)) {
             moveWork(sparkWork, parent, parentWork);
           } else {
             moveWork(sparkWork, parent, targetWork);
@@ -129,6 +136,46 @@ public class SparkMapJoinResolver implem
       }
     }
 
+    private void generateLocalWork(SparkTask originalTask) {
+      SparkWork originalWork = originalTask.getWork();
+      Collection<BaseWork> allBaseWorks = originalWork.getAllWorkUnsorted();
+
+      for (BaseWork work : allBaseWorks) {
+        if (containsOp(work, SparkHashTableSinkOperator.class) ||
+            containsOp(work, MapJoinOperator.class)) {
+          work.setMapRedLocalWork(new MapredLocalWork());
+        }
+      }
+
+      Context ctx = physicalContext.getContext();
+
+      for (BaseWork work : allBaseWorks) {
+        if (containsOp(work, MapJoinOperator.class)) {
+          Path tmpPath = Utilities.generateTmpPath(ctx.getMRTmpPath(), originalTask.getId());
+          MapredLocalWork bigTableLocalWork = work.getMapRedLocalWork();
+          List<Operator<? extends OperatorDesc>> dummyOps =
+              new ArrayList<Operator<? extends OperatorDesc>>(work.getDummyOps());
+          bigTableLocalWork.setDummyParentOp(dummyOps);
+
+          for (BaseWork parentWork : originalWork.getParents(work)) {
+            if (containsOp(parentWork,SparkHashTableSinkOperator.class)) {
+              parentWork.getMapRedLocalWork().setTmpHDFSPath(tmpPath);
+              parentWork.getMapRedLocalWork().setDummyParentOp(
+                  new ArrayList<Operator<? extends OperatorDesc>>());
+            }
+          }
+
+          bigTableLocalWork.setAliasToWork(
+              new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
+          bigTableLocalWork.setAliasToFetchWork(new LinkedHashMap<String, FetchWork>());
+          bigTableLocalWork.setTmpPath(tmpPath);
+
+          // TODO: set inputFileChangeSensitive and BucketMapjoinContext,
+          // TODO: enable non-staged mapjoin
+        }
+      }
+    }
+
     // Create a new SparkTask for the specified SparkWork, recursively compute
     // all the parent SparkTasks that this new task is depend on, if they don't already exists.
     private SparkTask createSparkTask(SparkTask originalTask,
@@ -167,7 +214,11 @@ public class SparkMapJoinResolver implem
         throws SemanticException {
       Task<? extends Serializable> currentTask = (Task<? extends Serializable>) nd;
       if (currentTask instanceof SparkTask) {
-        SparkWork sparkWork = ((SparkTask) currentTask).getWork();
+        SparkTask sparkTask = (SparkTask) currentTask;
+        SparkWork sparkWork = sparkTask.getWork();
+
+        // Generate MapredLocalWorks for MJ and HTS
+        generateLocalWork(sparkTask);
 
         dependencyGraph.put(sparkWork, new ArrayList<SparkWork>());
         Set<BaseWork> leaves = sparkWork.getLeaves();
@@ -187,7 +238,7 @@ public class SparkMapJoinResolver implem
 
         // Now create SparkTasks from the SparkWorks, also set up dependency
         for (SparkWork work : dependencyGraph.keySet()) {
-          createSparkTask((SparkTask)currentTask, work, createdTaskMap);
+          createSparkTask(sparkTask, work, createdTaskMap);
         }
       }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java?rev=1639569&r1=1639568&r2=1639569&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java Fri Nov 14 05:15:49 2014
@@ -45,7 +45,7 @@ public class MapredLocalWork implements 
   private BucketMapJoinContext bucketMapjoinContext;
   private Path tmpPath;
   private String stageID;
-  // Temp HDFS path for Spark HashTable sink and loader
+  // Temp HDFS path for Spark HashTable sink
   private Path tmpHDFSPath;
 
   private List<Operator<? extends OperatorDesc>> dummyParentOp;