You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/08 03:07:13 UTC

svn commit: r1616652 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark: GenSparkWork.java SparkCompiler.java

Author: brock
Date: Fri Aug  8 01:07:13 2014
New Revision: 1616652

URL: http://svn.apache.org/r1616652
Log:
HIVE-7584 - Change SparkCompiler to generate a SparkWork that contains UnionWork from logical operator tree (Na Yang via Brock) [Spark Branch]

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java?rev=1616652&r1=1616651&r2=1616652&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java Fri Aug  8 01:07:13 2014
@@ -226,7 +226,7 @@ public class GenSparkWork implements Nod
       // finally hook everything up
       LOG.debug("Connecting union work ("+unionWork+") with work ("+work+")");
       SparkEdgeProperty edgeProp = new SparkEdgeProperty(0/*EdgeType.CONTAINS*/);
-      sparkWork.connect(unionWork, work, edgeProp);
+      sparkWork.connect(work, unionWork, edgeProp);
       unionWork.addUnionOperators(context.currentUnionOperators);
       context.currentUnionOperators.clear();
       context.workWithUnionOperators.add(work);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1616652&r1=1616651&r2=1616652&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java Fri Aug  8 01:07:13 2014
@@ -26,6 +26,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -47,6 +49,7 @@ import org.apache.hadoop.hive.ql.lib.For
 import org.apache.hadoop.hive.ql.lib.GraphWalker;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -152,19 +155,19 @@ public class SparkCompiler extends TaskC
         TableScanOperator.getOperatorName() + "%"),
         new SparkProcessAnalyzeTable(GenSparkUtils.getUtils()));
 
-//    opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"),
-//        new NodeProcessor() {
-//      @Override
-//      public Object process(Node n, Stack<Node> s,
-//          NodeProcessorCtx procCtx, Object... os) throws SemanticException {
-//        GenSparkProcContext context = (GenSparkProcContext) procCtx;
-//        UnionOperator union = (UnionOperator) n;
-//
-//        // simply need to remember that we've seen a union.
-//        context.currentUnionOperators.add(union);
-//        return null;
-//      }
-//    });
+      opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"),
+          new NodeProcessor() {
+        @Override
+        public Object process(Node n, Stack<Node> s,
+          NodeProcessorCtx procCtx, Object... os) throws SemanticException {
+          GenSparkProcContext context = (GenSparkProcContext) procCtx;
+          UnionOperator union = (UnionOperator) n;
+
+          // simply need to remember that we've seen a union.
+          context.currentUnionOperators.add(union);
+          return null;
+        }
+      });
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along