You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/10 14:49:03 UTC

svn commit: r1637855 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: optimizer/physical/SparkMapJoinResolver.java plan/SparkWork.java

Author: xuefu
Date: Mon Nov 10 13:49:03 2014
New Revision: 1637855

URL: http://svn.apache.org/r1637855
Log:
HIVE-8622: Split map-join plan into 2 SparkTasks in 3 stages [Spark Branch] (Chao via Xuefu)

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java?rev=1637855&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java Mon Nov 10 13:49:03 2014
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+
+public class SparkMapJoinResolver implements PhysicalPlanResolver {
+
+  @Override
+  public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
+
+    Dispatcher dispatcher = new SparkMapJoinTaskDispatcher(pctx);
+    TaskGraphWalker graphWalker = new TaskGraphWalker(dispatcher);
+
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pctx.getRootTasks());
+    graphWalker.startWalking(topNodes, null);
+    return pctx;
+  }
+
+  // Check whether the specified BaseWork's operator tree contains a operator
+  // of the specified operator class
+  private boolean containsOp(BaseWork work, Class<?> clazz) {
+    for (Operator<? extends OperatorDesc> op : work.getAllOperators()) {
+      if (clazz.isInstance(op)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  class SparkMapJoinTaskDispatcher implements Dispatcher {
+
+    private final PhysicalContext physicalContext;
+
+    // For each BaseWork with MJ operator, we build a SparkWork for its small table BaseWorks
+    // This map records such information
+    private final Map<BaseWork, SparkWork> sparkWorkMap;
+
+    // SparkWork dependency graph - from a SparkWork with MJ operators to all
+    // of its parent SparkWorks for the small tables
+    private final Map<SparkWork, List<SparkWork>> dependencyGraph;
+
+    public SparkMapJoinTaskDispatcher(PhysicalContext pc) {
+      super();
+      physicalContext = pc;
+      sparkWorkMap = new HashMap<BaseWork, SparkWork>();
+      dependencyGraph = new HashMap<SparkWork, List<SparkWork>>();
+    }
+
+    // Move the specified work from the sparkWork to the targetWork
+    // Note that, in order not to break the graph (since we need it for the edges),
+    // we don't remove the work from the sparkWork here. The removal is done later.
+    private void moveWork(SparkWork sparkWork, BaseWork work, SparkWork targetWork) {
+      List<BaseWork> parentWorks = sparkWork.getParents(work);
+      if (sparkWork != targetWork) {
+        targetWork.add(work);
+
+        // If any child work for this work is already added to the targetWork earlier,
+        // we should connect this work with it
+        for (BaseWork childWork : sparkWork.getChildren(work)) {
+          if (targetWork.contains(childWork)) {
+            targetWork.connect(work, childWork, sparkWork.getEdgeProperty(work, childWork));
+          }
+        }
+      }
+
+      if (!containsOp(work, MapJoinOperator.class)) {
+        for (BaseWork parent : parentWorks) {
+          moveWork(sparkWork, parent, targetWork);
+        }
+      } else {
+        // Create a new SparkWork for all the small tables of this work
+        SparkWork parentWork =
+            new SparkWork(physicalContext.conf.getVar(HiveConf.ConfVars.HIVEQUERYID));
+
+        // Update dependency graph
+        if (!dependencyGraph.containsKey(targetWork)) {
+          dependencyGraph.put(targetWork, new ArrayList<SparkWork>());
+        }
+        dependencyGraph.get(targetWork).add(parentWork);
+
+        // this work is now moved to the parentWork, thus we should
+        // update this information in sparkWorkMap
+        sparkWorkMap.put(work, parentWork);
+        for (BaseWork parent : parentWorks) {
+          if (containsOp(parent, HashTableSinkOperator.class)) {
+            moveWork(sparkWork, parent, parentWork);
+          } else {
+            moveWork(sparkWork, parent, targetWork);
+          }
+        }
+      }
+    }
+
+    // Create a new SparkTask for the specified SparkWork, recursively compute
+    // all the parent SparkTasks that this new task is depend on, if they don't already exists.
+    private SparkTask createSparkTask(Task<? extends Serializable> originalTask,
+                                      SparkWork sparkWork,
+                                      Map<SparkWork, SparkTask> createdTaskMap) {
+      if (createdTaskMap.containsKey(sparkWork)) {
+        return createdTaskMap.get(sparkWork);
+      }
+      SparkTask resultTask = (SparkTask) TaskFactory.get(sparkWork, physicalContext.conf);
+      if (dependencyGraph.get(sparkWork) != null) {
+        for (SparkWork parentWork : dependencyGraph.get(sparkWork)) {
+          SparkTask parentTask = createSparkTask(originalTask, parentWork, createdTaskMap);
+          parentTask.addDependentTask(resultTask);
+        }
+      } else {
+        List<Task<? extends Serializable>> parentTasks = originalTask.getParentTasks();
+        if (parentTasks != null && parentTasks.size() > 0) {
+          for (Task<? extends Serializable> parentTask : parentTasks) {
+            parentTask.addDependentTask(resultTask);
+          }
+        } else {
+          physicalContext.addToRootTask(resultTask);
+          physicalContext.removeFromRootTask(originalTask);
+        }
+      }
+      return resultTask;
+    }
+
+    @Override
+    public Object dispatch(Node nd, Stack<Node> stack, Object... nos)
+        throws SemanticException {
+      Task<? extends Serializable> currentTask = (Task<? extends Serializable>) nd;
+      if (currentTask instanceof SparkTask) {
+        SparkWork sparkWork = ((SparkTask) currentTask).getWork();
+        Set<BaseWork> leaves = sparkWork.getLeaves();
+        for (BaseWork leaf : leaves) {
+          moveWork(sparkWork, leaf, sparkWork);
+        }
+
+        // Now remove all BaseWorks in all the childSparkWorks that we created
+        // from the original SparkWork
+        for (SparkWork newSparkWork : sparkWorkMap.values()) {
+          for (BaseWork work : newSparkWork.getAllWorkUnsorted()) {
+            sparkWork.remove(work);
+          }
+        }
+
+        Map<SparkWork, SparkTask> createdTaskMap = new HashMap<SparkWork, SparkTask>();
+
+        // Now create SparkTasks from the SparkWorks, also set up dependency
+        for (SparkWork work : dependencyGraph.keySet()) {
+          createSparkTask(currentTask, work, createdTaskMap);
+        }
+      }
+
+      return null;
+    }
+  }
+}
\ No newline at end of file

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java?rev=1637855&r1=1637854&r2=1637855&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java Mon Nov 10 13:49:03 2014
@@ -134,6 +134,15 @@ public class SparkWork extends AbstractO
   }
 
   /**
+   * Whether the specified BaseWork is a vertex in this graph
+   * @param w the BaseWork to check
+   * @return whether specified BaseWork is in this graph
+   */
+  public boolean contains(BaseWork w) {
+    return workGraph.containsKey(w);
+  }
+
+  /**
    * add creates a new node in the graph without any connections
    */
   public void add(BaseWork w) {