You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/18 21:05:14 UTC
svn commit: r1618707 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql:
exec/spark/SparkPlanGenerator.java parse/spark/GenSparkUtils.java
parse/spark/GenSparkWork.java plan/SparkEdgeProperty.java
Author: brock
Date: Mon Aug 18 19:05:13 2014
New Revision: 1618707
URL: http://svn.apache.org/r1618707
Log:
HIVE-7528 - Support cluster by and distributed by [Spark Branch] (Rui Li via Brock)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1618707&r1=1618706&r2=1618707&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Mon Aug 18 19:05:13 2014
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -201,6 +202,8 @@ public class SparkPlanGenerator {
}
private SparkShuffler generate(SparkEdgeProperty edge) {
+ Preconditions.checkArgument(!edge.isShuffleNone(),
+ "AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
if (edge.isShuffleSort()) {
return new SortByShuffler();
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1618707&r1=1618706&r2=1618707&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Mon Aug 18 19:05:13 2014
@@ -102,12 +102,10 @@ public class GenSparkUtils {
sparkWork.add(reduceWork);
- SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE,
+ // Use group-by as the default shuffler
+ SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_GROUP,
reduceWork.getNumReduceTasks());
- if (root instanceof GroupByOperator) {
- edgeProp.setShuffleGroup();
- }
String sortOrder = Strings.nullToEmpty(reduceSink.getConf().getOrder()).trim();
if (!sortOrder.isEmpty() && isSortNecessary(reduceSink)) {
edgeProp.setShuffleSort();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java?rev=1618707&r1=1618706&r2=1618707&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java Mon Aug 18 19:05:13 2014
@@ -225,7 +225,7 @@ public class GenSparkWork implements Nod
// finally hook everything up
LOG.debug("Connecting union work ("+unionWork+") with work ("+work+")");
- SparkEdgeProperty edgeProp = new SparkEdgeProperty(0/*EdgeType.CONTAINS*/);
+ SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE);
sparkWork.connect(work, unionWork, edgeProp);
unionWork.addUnionOperators(context.currentUnionOperators);
context.currentUnionOperators.clear();
@@ -271,11 +271,9 @@ public class GenSparkWork implements Nod
if (!context.connectedReduceSinks.contains(rs)) {
// add dependency between the two work items
- SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE,
+ // Use group-by as the default shuffler
+ SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_GROUP,
rs.getConf().getNumReducers());
- if(rWork.getReducer() instanceof GroupByOperator){
- edgeProp.setShuffleGroup();
- }
String sortOrder = Strings.nullToEmpty(rs.getConf().getOrder()).trim();
if (!sortOrder.isEmpty() && GenSparkUtils.isSortNecessary(rs)) {
edgeProp.setShuffleSort();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java?rev=1618707&r1=1618706&r2=1618707&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java Mon Aug 18 19:05:13 2014
@@ -77,11 +77,10 @@ public class SparkEdgeProperty {
sb.append("GROUP");
}
- if (sb.length() != 0) {
- sb.append(" ");
- }
-
if (isShuffleSort()) {
+ if (sb.length() != 0) {
+ sb.append(" ");
+ }
sb.append("SORT");
}