You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/05 07:38:40 UTC

svn commit: r1615864 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: exec/spark/ parse/spark/ plan/

Author: brock
Date: Tue Aug  5 05:38:40 2014
New Revision: 1615864

URL: http://svn.apache.org/r1615864
Log:
HIVE-7527 - Support order by and sort by on Spark (Rui Li via Brock) [Spark Branch]

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java?rev=1615864&r1=1615863&r2=1615864&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java Tue Aug  5 05:38:40 2014
@@ -7,8 +7,11 @@ public class GroupByShuffler implements 
 
   @Override
   public JavaPairRDD<BytesWritable, Iterable<BytesWritable>> shuffle(
-      JavaPairRDD<BytesWritable, BytesWritable> input) {
-    return input.groupByKey(/* default to hash partition */);
+      JavaPairRDD<BytesWritable, BytesWritable> input, int numPartitions) {
+    if (numPartitions > 0) {
+      return input.groupByKey(numPartitions);
+    }
+    return input.groupByKey();
   }
 
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java?rev=1615864&r1=1615863&r2=1615864&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java Tue Aug  5 05:38:40 2014
@@ -24,11 +24,12 @@ import org.apache.spark.api.java.JavaPai
 public class ReduceTran implements SparkTran {
   private SparkShuffler shuffler;
   private HiveReduceFunction reduceFunc;
+  private int numPartitions;
 
   @Override
   public JavaPairRDD<BytesWritable, BytesWritable> transform(
       JavaPairRDD<BytesWritable, BytesWritable> input) {
-    return shuffler.shuffle(input).mapPartitionsToPair(reduceFunc);
+    return shuffler.shuffle(input, numPartitions).mapPartitionsToPair(reduceFunc);
   }
 
   public void setReduceFunction(HiveReduceFunction redFunc) {
@@ -39,4 +40,8 @@ public class ReduceTran implements Spark
     this.shuffler = shuffler;
   }
 
+  public void setNumPartitions(int numPartitions) {
+    this.numPartitions = numPartitions;
+  }
+
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java?rev=1615864&r1=1615863&r2=1615864&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java Tue Aug  5 05:38:40 2014
@@ -18,8 +18,9 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import java.util.Iterator;
+import java.util.*;
 
+import com.google.common.collect.Ordering;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
@@ -30,10 +31,12 @@ public class SortByShuffler implements S
 
   @Override
   public JavaPairRDD<BytesWritable, Iterable<BytesWritable>> shuffle(
-      JavaPairRDD<BytesWritable, BytesWritable> input) {
-    JavaPairRDD<BytesWritable, BytesWritable> rdd = input.sortByKey();
+      JavaPairRDD<BytesWritable, BytesWritable> input, int numPartitions) {
+    Comparator comp = Ordering.<BytesWritable>natural();
+    // Due to HIVE-7540, numPartitions must be to 1
+    JavaPairRDD<BytesWritable, BytesWritable> rdd = input.sortByKey(comp, true, 1);
     return rdd.mapPartitionsToPair(new ShuffleFunction());
-  };
+  }
 
   private static class ShuffleFunction implements
   PairFlatMapFunction<Iterator<Tuple2<BytesWritable, BytesWritable>>,
@@ -45,10 +48,10 @@ public class SortByShuffler implements S
     public Iterable<Tuple2<BytesWritable, Iterable<BytesWritable>>> call(
         final Iterator<Tuple2<BytesWritable, BytesWritable>> it) throws Exception {
       // Use input iterator to back returned iterable object.
-      final Iterator<Tuple2<BytesWritable, Iterable<BytesWritable>>> resultIt = 
+      final Iterator<Tuple2<BytesWritable, Iterable<BytesWritable>>> resultIt =
           new Iterator<Tuple2<BytesWritable, Iterable<BytesWritable>>>() {
         BytesWritable curKey = null;
-        BytesWritable curValue = null;
+        List<BytesWritable> curValues = new ArrayList<BytesWritable>();
 
         @Override
         public boolean hasNext() {
@@ -60,13 +63,33 @@ public class SortByShuffler implements S
           // TODO: implement this by accumulating rows with the same key into a list.
           // Note that this list needs to improved to prevent excessive memory usage, but this
           // can be done in later phase.
-          return null;
+          while (it.hasNext()) {
+            Tuple2<BytesWritable, BytesWritable> pair = it.next();
+            if (curKey != null && !curKey.equals(pair._1())) {
+              BytesWritable key = curKey;
+              List<BytesWritable> values = curValues;
+              curKey = pair._1();
+              curValues = new ArrayList<BytesWritable>();
+              curValues.add(pair._2());
+              return new Tuple2<BytesWritable, Iterable<BytesWritable>>(key, values);
+            }
+            curKey = pair._1();
+            curValues.add(pair._2());
+          }
+          if (curKey == null) {
+            throw new NoSuchElementException();
+          }
+          // if we get here, this should be the last element we have
+          BytesWritable key = curKey;
+          curKey = null;
+          return new Tuple2<BytesWritable, Iterable<BytesWritable>>(key, curValues);
         }
 
         @Override
         public void remove() {
           // Not implemented.
           // throw Unsupported Method Invocation Exception.
+          throw new UnsupportedOperationException();
         }
 
       };

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1615864&r1=1615863&r2=1615864&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Tue Aug  5 05:38:40 2014
@@ -67,6 +67,7 @@ public class SparkPlanGenerator {
       SparkShuffler st = generate(edge);
       ReduceTran rt = generate(child);
       rt.setShuffler(st);
+      rt.setNumPartitions(edge.getNumPartitions());
       trans.add(rt);
       w = child;
     }
@@ -110,7 +111,9 @@ public class SparkPlanGenerator {
   }
 
   private SparkShuffler generate(SparkEdgeProperty edge) {
-    // TODO: create different shuffler based on edge prop.
+    if (edge.isShuffleSort()){
+      return new SortByShuffler();
+    }
     return new GroupByShuffler();
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java?rev=1615864&r1=1615863&r2=1615864&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java Tue Aug  5 05:38:40 2014
@@ -24,6 +24,6 @@ import org.apache.spark.api.java.JavaPai
 public interface SparkShuffler {
 
   JavaPairRDD<BytesWritable, Iterable<BytesWritable>> shuffle(
-      JavaPairRDD<BytesWritable, BytesWritable> input);
+      JavaPairRDD<BytesWritable, BytesWritable> input, int numPartitions);
 
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1615864&r1=1615863&r2=1615864&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Tue Aug  5 05:38:40 2014
@@ -28,31 +28,18 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.base.Strings;
 import org.apache.hadoop.fs.Path;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.FetchTask;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.*;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.ReduceWork;
-import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
-import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.plan.UnionWork;
+import org.apache.hadoop.hive.ql.plan.*;
 
 import com.google.common.base.Preconditions;
 
@@ -142,12 +129,15 @@ public class GenSparkUtils {
 
     sparkWork.add(reduceWork);
 
-    SparkEdgeProperty edgeProp;
-    if (reduceWork.isAutoReduceParallelism()) {
-      edgeProp =
-          new SparkEdgeProperty(0);
-    } else {
-      edgeProp = new SparkEdgeProperty(0);
+    SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE,
+        reduceSink.getConf().getNumReducers());
+
+    if (root instanceof GroupByOperator) {
+      edgeProp.setShuffleGroup();
+    }
+    String sortOrder = Strings.nullToEmpty(reduceSink.getConf().getOrder()).trim();
+    if (!sortOrder.isEmpty()) {
+      edgeProp.setShuffleSort();
     }
 
     sparkWork.connect(

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java?rev=1615864&r1=1615863&r2=1615864&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java Tue Aug  5 05:38:40 2014
@@ -25,14 +25,11 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Stack;
 
+import com.google.common.base.Strings;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.OperatorFactory;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.*;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -274,13 +271,14 @@ public class GenSparkWork implements Nod
 
       if (!context.connectedReduceSinks.contains(rs)) {
         // add dependency between the two work items
-        SparkEdgeProperty edgeProp;
-        if (rWork.isAutoReduceParallelism()) {
-          edgeProp =
-              new SparkEdgeProperty(0/*context.conf, EdgeType.SIMPLE_EDGE, true,
-                  rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer*/);
-        } else {
-          edgeProp = new SparkEdgeProperty(0/*EdgeType.SIMPLE_EDGE*/);
+        SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE,
+            rs.getConf().getNumReducers());
+        if(rWork.getReducer() instanceof GroupByOperator){
+          edgeProp.setShuffleGroup();
+        }
+        String sortOrder = Strings.nullToEmpty(rs.getConf().getOrder()).trim();
+        if (!sortOrder.isEmpty()) {
+          edgeProp.setShuffleSort();
         }
         sparkWork.connect(work, rWork, edgeProp);
         context.connectedReduceSinks.add(rs);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java?rev=1615864&r1=1615863&r2=1615864&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java Tue Aug  5 05:38:40 2014
@@ -33,8 +33,8 @@ public class SparkEdgeProperty {
     this.edgeType = edgeType;
     this.numPartitions = numPartitions;
   }
-  
-  public SparkEdgeProperty(int edgeType) {
+
+  public SparkEdgeProperty(long edgeType) {
     this.edgeType = edgeType;
   }