You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/18 21:05:14 UTC

svn commit: r1618707 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: exec/spark/SparkPlanGenerator.java parse/spark/GenSparkUtils.java parse/spark/GenSparkWork.java plan/SparkEdgeProperty.java

Author: brock
Date: Mon Aug 18 19:05:13 2014
New Revision: 1618707

URL: http://svn.apache.org/r1618707
Log:
HIVE-7528 - Support cluster by and distributed by [Spark Branch] (Rui Li via Brock)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1618707&r1=1618706&r2=1618707&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Mon Aug 18 19:05:13 2014
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -201,6 +202,8 @@ public class SparkPlanGenerator {
   }
 
   private SparkShuffler generate(SparkEdgeProperty edge) {
+    Preconditions.checkArgument(!edge.isShuffleNone(),
+        "AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
     if (edge.isShuffleSort()) {
       return new SortByShuffler();
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1618707&r1=1618706&r2=1618707&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Mon Aug 18 19:05:13 2014
@@ -102,12 +102,10 @@ public class GenSparkUtils {
 
     sparkWork.add(reduceWork);
 
-    SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE,
+    // Use group-by as the default shuffler
+    SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_GROUP,
         reduceWork.getNumReduceTasks());
 
-    if (root instanceof GroupByOperator) {
-      edgeProp.setShuffleGroup();
-    }
     String sortOrder = Strings.nullToEmpty(reduceSink.getConf().getOrder()).trim();
     if (!sortOrder.isEmpty() && isSortNecessary(reduceSink)) {
       edgeProp.setShuffleSort();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java?rev=1618707&r1=1618706&r2=1618707&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java Mon Aug 18 19:05:13 2014
@@ -225,7 +225,7 @@ public class GenSparkWork implements Nod
 
       // finally hook everything up
       LOG.debug("Connecting union work ("+unionWork+") with work ("+work+")");
-      SparkEdgeProperty edgeProp = new SparkEdgeProperty(0/*EdgeType.CONTAINS*/);
+      SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE);
       sparkWork.connect(work, unionWork, edgeProp);
       unionWork.addUnionOperators(context.currentUnionOperators);
       context.currentUnionOperators.clear();
@@ -271,11 +271,9 @@ public class GenSparkWork implements Nod
 
       if (!context.connectedReduceSinks.contains(rs)) {
         // add dependency between the two work items
-        SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE,
+        // Use group-by as the default shuffler
+        SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_GROUP,
             rs.getConf().getNumReducers());
-        if(rWork.getReducer() instanceof GroupByOperator){
-          edgeProp.setShuffleGroup();
-        }
         String sortOrder = Strings.nullToEmpty(rs.getConf().getOrder()).trim();
         if (!sortOrder.isEmpty() && GenSparkUtils.isSortNecessary(rs)) {
           edgeProp.setShuffleSort();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java?rev=1618707&r1=1618706&r2=1618707&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java Mon Aug 18 19:05:13 2014
@@ -77,11 +77,10 @@ public class SparkEdgeProperty {
       sb.append("GROUP");
     }
 
-    if (sb.length() != 0) {
-      sb.append(" ");
-    }
-
     if (isShuffleSort()) {
+      if (sb.length() != 0) {
+        sb.append(" ");
+      }
       sb.append("SORT");
     }