You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/05 07:38:40 UTC
svn commit: r1615864 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: exec/spark/
parse/spark/ plan/
Author: brock
Date: Tue Aug 5 05:38:40 2014
New Revision: 1615864
URL: http://svn.apache.org/r1615864
Log:
HIVE-7527 - Support order by and sort by on Spark (Rui Li via Brock) [Spark Branch]
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java?rev=1615864&r1=1615863&r2=1615864&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java Tue Aug 5 05:38:40 2014
@@ -7,8 +7,11 @@ public class GroupByShuffler implements
@Override
public JavaPairRDD<BytesWritable, Iterable<BytesWritable>> shuffle(
- JavaPairRDD<BytesWritable, BytesWritable> input) {
- return input.groupByKey(/* default to hash partition */);
+ JavaPairRDD<BytesWritable, BytesWritable> input, int numPartitions) {
+ if (numPartitions > 0) {
+ return input.groupByKey(numPartitions);
+ }
+ return input.groupByKey();
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java?rev=1615864&r1=1615863&r2=1615864&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java Tue Aug 5 05:38:40 2014
@@ -24,11 +24,12 @@ import org.apache.spark.api.java.JavaPai
public class ReduceTran implements SparkTran {
private SparkShuffler shuffler;
private HiveReduceFunction reduceFunc;
+ private int numPartitions;
@Override
public JavaPairRDD<BytesWritable, BytesWritable> transform(
JavaPairRDD<BytesWritable, BytesWritable> input) {
- return shuffler.shuffle(input).mapPartitionsToPair(reduceFunc);
+ return shuffler.shuffle(input, numPartitions).mapPartitionsToPair(reduceFunc);
}
public void setReduceFunction(HiveReduceFunction redFunc) {
@@ -39,4 +40,8 @@ public class ReduceTran implements Spark
this.shuffler = shuffler;
}
+ public void setNumPartitions(int numPartitions) {
+ this.numPartitions = numPartitions;
+ }
+
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java?rev=1615864&r1=1615863&r2=1615864&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java Tue Aug 5 05:38:40 2014
@@ -18,8 +18,9 @@
package org.apache.hadoop.hive.ql.exec.spark;
-import java.util.Iterator;
+import java.util.*;
+import com.google.common.collect.Ordering;
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
@@ -30,10 +31,12 @@ public class SortByShuffler implements S
@Override
public JavaPairRDD<BytesWritable, Iterable<BytesWritable>> shuffle(
- JavaPairRDD<BytesWritable, BytesWritable> input) {
- JavaPairRDD<BytesWritable, BytesWritable> rdd = input.sortByKey();
+ JavaPairRDD<BytesWritable, BytesWritable> input, int numPartitions) {
+ Comparator comp = Ordering.<BytesWritable>natural();
+ // Due to HIVE-7540, numPartitions must be to 1
+ JavaPairRDD<BytesWritable, BytesWritable> rdd = input.sortByKey(comp, true, 1);
return rdd.mapPartitionsToPair(new ShuffleFunction());
- };
+ }
private static class ShuffleFunction implements
PairFlatMapFunction<Iterator<Tuple2<BytesWritable, BytesWritable>>,
@@ -45,10 +48,10 @@ public class SortByShuffler implements S
public Iterable<Tuple2<BytesWritable, Iterable<BytesWritable>>> call(
final Iterator<Tuple2<BytesWritable, BytesWritable>> it) throws Exception {
// Use input iterator to back returned iterable object.
- final Iterator<Tuple2<BytesWritable, Iterable<BytesWritable>>> resultIt =
+ final Iterator<Tuple2<BytesWritable, Iterable<BytesWritable>>> resultIt =
new Iterator<Tuple2<BytesWritable, Iterable<BytesWritable>>>() {
BytesWritable curKey = null;
- BytesWritable curValue = null;
+ List<BytesWritable> curValues = new ArrayList<BytesWritable>();
@Override
public boolean hasNext() {
@@ -60,13 +63,33 @@ public class SortByShuffler implements S
// TODO: implement this by accumulating rows with the same key into a list.
// Note that this list needs to improved to prevent excessive memory usage, but this
// can be done in later phase.
- return null;
+ while (it.hasNext()) {
+ Tuple2<BytesWritable, BytesWritable> pair = it.next();
+ if (curKey != null && !curKey.equals(pair._1())) {
+ BytesWritable key = curKey;
+ List<BytesWritable> values = curValues;
+ curKey = pair._1();
+ curValues = new ArrayList<BytesWritable>();
+ curValues.add(pair._2());
+ return new Tuple2<BytesWritable, Iterable<BytesWritable>>(key, values);
+ }
+ curKey = pair._1();
+ curValues.add(pair._2());
+ }
+ if (curKey == null) {
+ throw new NoSuchElementException();
+ }
+ // if we get here, this should be the last element we have
+ BytesWritable key = curKey;
+ curKey = null;
+ return new Tuple2<BytesWritable, Iterable<BytesWritable>>(key, curValues);
}
@Override
public void remove() {
// Not implemented.
// throw Unsupported Method Invocation Exception.
+ throw new UnsupportedOperationException();
}
};
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1615864&r1=1615863&r2=1615864&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Tue Aug 5 05:38:40 2014
@@ -67,6 +67,7 @@ public class SparkPlanGenerator {
SparkShuffler st = generate(edge);
ReduceTran rt = generate(child);
rt.setShuffler(st);
+ rt.setNumPartitions(edge.getNumPartitions());
trans.add(rt);
w = child;
}
@@ -110,7 +111,9 @@ public class SparkPlanGenerator {
}
private SparkShuffler generate(SparkEdgeProperty edge) {
- // TODO: create different shuffler based on edge prop.
+ if (edge.isShuffleSort()){
+ return new SortByShuffler();
+ }
return new GroupByShuffler();
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java?rev=1615864&r1=1615863&r2=1615864&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java Tue Aug 5 05:38:40 2014
@@ -24,6 +24,6 @@ import org.apache.spark.api.java.JavaPai
public interface SparkShuffler {
JavaPairRDD<BytesWritable, Iterable<BytesWritable>> shuffle(
- JavaPairRDD<BytesWritable, BytesWritable> input);
+ JavaPairRDD<BytesWritable, BytesWritable> input, int numPartitions);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1615864&r1=1615863&r2=1615864&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Tue Aug 5 05:38:40 2014
@@ -28,31 +28,18 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
+import com.google.common.base.Strings;
import org.apache.hadoop.fs.Path;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.FetchTask;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.*;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.ReduceWork;
-import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
-import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.plan.UnionWork;
+import org.apache.hadoop.hive.ql.plan.*;
import com.google.common.base.Preconditions;
@@ -142,12 +129,15 @@ public class GenSparkUtils {
sparkWork.add(reduceWork);
- SparkEdgeProperty edgeProp;
- if (reduceWork.isAutoReduceParallelism()) {
- edgeProp =
- new SparkEdgeProperty(0);
- } else {
- edgeProp = new SparkEdgeProperty(0);
+ SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE,
+ reduceSink.getConf().getNumReducers());
+
+ if (root instanceof GroupByOperator) {
+ edgeProp.setShuffleGroup();
+ }
+ String sortOrder = Strings.nullToEmpty(reduceSink.getConf().getOrder()).trim();
+ if (!sortOrder.isEmpty()) {
+ edgeProp.setShuffleSort();
}
sparkWork.connect(
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java?rev=1615864&r1=1615863&r2=1615864&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java Tue Aug 5 05:38:40 2014
@@ -25,14 +25,11 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Stack;
+import com.google.common.base.Strings;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.OperatorFactory;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.*;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -274,13 +271,14 @@ public class GenSparkWork implements Nod
if (!context.connectedReduceSinks.contains(rs)) {
// add dependency between the two work items
- SparkEdgeProperty edgeProp;
- if (rWork.isAutoReduceParallelism()) {
- edgeProp =
- new SparkEdgeProperty(0/*context.conf, EdgeType.SIMPLE_EDGE, true,
- rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer*/);
- } else {
- edgeProp = new SparkEdgeProperty(0/*EdgeType.SIMPLE_EDGE*/);
+ SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE,
+ rs.getConf().getNumReducers());
+ if(rWork.getReducer() instanceof GroupByOperator){
+ edgeProp.setShuffleGroup();
+ }
+ String sortOrder = Strings.nullToEmpty(rs.getConf().getOrder()).trim();
+ if (!sortOrder.isEmpty()) {
+ edgeProp.setShuffleSort();
}
sparkWork.connect(work, rWork, edgeProp);
context.connectedReduceSinks.add(rs);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java?rev=1615864&r1=1615863&r2=1615864&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java Tue Aug 5 05:38:40 2014
@@ -33,8 +33,8 @@ public class SparkEdgeProperty {
this.edgeType = edgeType;
this.numPartitions = numPartitions;
}
-
- public SparkEdgeProperty(int edgeType) {
+
+ public SparkEdgeProperty(long edgeType) {
this.edgeType = edgeType;
}