You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2017/01/20 20:57:01 UTC

hive git commit: HIVE-15580: Eliminate unbounded memory usage for orderBy and groupBy in Hive on Spark (reviewed by Chao Sun)

Repository: hive
Updated Branches:
  refs/heads/master f968cf78a -> 811b3e39e


HIVE-15580: Eliminate unbounded memory usage for orderBy and groupBy in Hive on Spark (reviewed by Chao Sun)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/811b3e39
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/811b3e39
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/811b3e39

Branch: refs/heads/master
Commit: 811b3e39ed569232c4f138c1287109ef8ebce132
Parents: f968cf7
Author: Xuefu Zhang <xu...@uber.com>
Authored: Fri Jan 20 12:56:49 2017 -0800
Committer: Xuefu Zhang <xu...@uber.com>
Committed: Fri Jan 20 12:56:49 2017 -0800

----------------------------------------------------------------------
 .../hive/ql/exec/spark/GroupByShuffler.java     | 10 +--
 .../hive/ql/exec/spark/HiveReduceFunction.java  |  4 +-
 .../spark/HiveReduceFunctionResultList.java     |  8 +--
 .../hadoop/hive/ql/exec/spark/ReduceTran.java   |  4 +-
 .../hadoop/hive/ql/exec/spark/ShuffleTran.java  |  6 +-
 .../hive/ql/exec/spark/SortByShuffler.java      | 65 +-------------------
 .../hive/ql/exec/spark/SparkPlanGenerator.java  |  7 ---
 .../ql/exec/spark/SparkReduceRecordHandler.java | 29 +++++++--
 .../hive/ql/exec/spark/SparkShuffler.java       |  2 +-
 .../queries/clientpositive/union_top_level.q    |  8 +--
 .../clientpositive/llap/union_top_level.q.out   | 52 ++++++++--------
 .../spark/lateral_view_explode2.q.out           |  2 +-
 .../clientpositive/spark/union_remove_25.q.out  |  2 +-
 .../clientpositive/spark/union_top_level.q.out  | 62 +++++++++----------
 .../spark/vector_outer_join5.q.out              | 40 ++++++------
 15 files changed, 124 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java
index e128dd2..8267515 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java
@@ -20,21 +20,23 @@ package org.apache.hadoop.hive.ql.exec.spark;
 
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.HashPartitioner;
 import org.apache.spark.api.java.JavaPairRDD;
 
 public class GroupByShuffler implements SparkShuffler {
 
   @Override
-  public JavaPairRDD<HiveKey, Iterable<BytesWritable>> shuffle(
+  public JavaPairRDD<HiveKey, BytesWritable> shuffle(
       JavaPairRDD<HiveKey, BytesWritable> input, int numPartitions) {
-    if (numPartitions > 0) {
-      return input.groupByKey(numPartitions);
+    if (numPartitions < 0) {
+      numPartitions = 1;
     }
-    return input.groupByKey();
+    return input.repartitionAndSortWithinPartitions(new HashPartitioner(numPartitions));
   }
 
   @Override
   public String getName() {
     return "GroupBy";
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
index eeb4443..2b85872 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.io.BytesWritable;
 import scala.Tuple2;
 
 public class HiveReduceFunction extends HivePairFlatMapFunction<
-  Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>>, HiveKey, BytesWritable> {
+  Iterator<Tuple2<HiveKey, BytesWritable>>, HiveKey, BytesWritable> {
 
   private static final long serialVersionUID = 1L;
 
@@ -37,7 +37,7 @@ public class HiveReduceFunction extends HivePairFlatMapFunction<
   @SuppressWarnings("unchecked")
   @Override
   public Iterator<Tuple2<HiveKey, BytesWritable>>
-  call(Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> it) throws Exception {
+  call(Iterator<Tuple2<HiveKey, BytesWritable>> it) throws Exception {
     initJobConf();
 
     SparkReduceRecordHandler reducerRecordhandler = new SparkReduceRecordHandler();

http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
index d57cac4..8708819 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.io.BytesWritable;
 import scala.Tuple2;
 
 public class HiveReduceFunctionResultList extends
-    HiveBaseFunctionResultList<Tuple2<HiveKey, Iterable<BytesWritable>>> {
+    HiveBaseFunctionResultList<Tuple2<HiveKey, BytesWritable>> {
   private static final long serialVersionUID = 1L;
   private final SparkReduceRecordHandler reduceRecordHandler;
 
@@ -37,16 +37,16 @@ public class HiveReduceFunctionResultList extends
    * @param reducer Initialized {@link org.apache.hadoop.hive.ql.exec.mr.ExecReducer} instance.
    */
   public HiveReduceFunctionResultList(
-      Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> inputIterator,
+      Iterator<Tuple2<HiveKey, BytesWritable>> inputIterator,
       SparkReduceRecordHandler reducer) {
     super(inputIterator);
     this.reduceRecordHandler = reducer;
   }
 
   @Override
-  protected void processNextRecord(Tuple2<HiveKey, Iterable<BytesWritable>> inputRecord)
+  protected void processNextRecord(Tuple2<HiveKey, BytesWritable> inputRecord)
       throws IOException {
-    reduceRecordHandler.processRow(inputRecord._1(), inputRecord._2().iterator());
+    reduceRecordHandler.processRow(inputRecord._1(), inputRecord._2());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
index 3d56876..926e1f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 
-public class ReduceTran extends CacheTran<HiveKey, Iterable<BytesWritable>, HiveKey, BytesWritable> {
+public class ReduceTran extends CacheTran<HiveKey, BytesWritable, HiveKey, BytesWritable> {
   private HiveReduceFunction reduceFunc;
   private String name = "Reduce";
 
@@ -36,7 +36,7 @@ public class ReduceTran extends CacheTran<HiveKey, Iterable<BytesWritable>, Hive
 
   @Override
   public JavaPairRDD<HiveKey, BytesWritable> doTransform(
-      JavaPairRDD<HiveKey, Iterable<BytesWritable>> input) {
+      JavaPairRDD<HiveKey, BytesWritable> input) {
     return input.mapPartitionsToPair(reduceFunc);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
index a774395..2aac2b4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.storage.StorageLevel;
 
-public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, Iterable<BytesWritable>> {
+public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, BytesWritable> {
   private final SparkShuffler shuffler;
   private final int numOfPartitions;
   private final boolean toCache;
@@ -42,8 +42,8 @@ public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, I
   }
 
   @Override
-  public JavaPairRDD<HiveKey, Iterable<BytesWritable>> transform(JavaPairRDD<HiveKey, BytesWritable> input) {
-    JavaPairRDD<HiveKey, Iterable<BytesWritable>> result = shuffler.shuffle(input, numOfPartitions);
+  public JavaPairRDD<HiveKey, BytesWritable> transform(JavaPairRDD<HiveKey, BytesWritable> input) {
+    JavaPairRDD<HiveKey, BytesWritable> result = shuffler.shuffle(input, numOfPartitions);
     if (toCache) {
       sparkPlan.addCachedRDDId(result.id());
       result = result.persist(StorageLevel.MEMORY_AND_DISK());

http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
index 997ab7e..b7ab5e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
@@ -23,11 +23,7 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.storage.StorageLevel;
-import scala.Tuple2;
-
-import java.util.*;
 
 public class SortByShuffler implements SparkShuffler {
 
@@ -43,7 +39,7 @@ public class SortByShuffler implements SparkShuffler {
   }
 
   @Override
-  public JavaPairRDD<HiveKey, Iterable<BytesWritable>> shuffle(
+  public JavaPairRDD<HiveKey, BytesWritable> shuffle(
       JavaPairRDD<HiveKey, BytesWritable> input, int numPartitions) {
     JavaPairRDD<HiveKey, BytesWritable> rdd;
     if (totalOrder) {
@@ -60,7 +56,7 @@ public class SortByShuffler implements SparkShuffler {
       Partitioner partitioner = new HashPartitioner(numPartitions);
       rdd = input.repartitionAndSortWithinPartitions(partitioner);
     }
-    return rdd.mapPartitionsToPair(new ShuffleFunction());
+    return rdd;
   }
 
   @Override
@@ -68,61 +64,4 @@ public class SortByShuffler implements SparkShuffler {
     return "SortBy";
   }
 
-  private static class ShuffleFunction implements
-      PairFlatMapFunction<Iterator<Tuple2<HiveKey, BytesWritable>>,
-          HiveKey, Iterable<BytesWritable>> {
-    // make eclipse happy
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> call(
-      final Iterator<Tuple2<HiveKey, BytesWritable>> it) throws Exception {
-      // Use input iterator to back returned iterable object.
-      return new Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>>() {
-        HiveKey curKey = null;
-        List<BytesWritable> curValues = new ArrayList<BytesWritable>();
-
-        @Override
-        public boolean hasNext() {
-          return it.hasNext() || curKey != null;
-        }
-
-        @Override
-        public Tuple2<HiveKey, Iterable<BytesWritable>> next() {
-          // TODO: implement this by accumulating rows with the same key into a list.
-          // Note that this list needs to improved to prevent excessive memory usage, but this
-          // can be done in later phase.
-          while (it.hasNext()) {
-            Tuple2<HiveKey, BytesWritable> pair = it.next();
-            if (curKey != null && !curKey.equals(pair._1())) {
-              HiveKey key = curKey;
-              List<BytesWritable> values = curValues;
-              curKey = pair._1();
-              curValues = new ArrayList<BytesWritable>();
-              curValues.add(pair._2());
-              return new Tuple2<HiveKey, Iterable<BytesWritable>>(key, values);
-            }
-            curKey = pair._1();
-            curValues.add(pair._2());
-          }
-          if (curKey == null) {
-            throw new NoSuchElementException();
-          }
-          // if we get here, this should be the last element we have
-          HiveKey key = curKey;
-          curKey = null;
-          return new Tuple2<HiveKey, Iterable<BytesWritable>>(key, curValues);
-        }
-
-        @Override
-        public void remove() {
-          // Not implemented.
-          // throw Unsupported Method Invocation Exception.
-          throw new UnsupportedOperationException();
-        }
-
-      };
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
index 66ffe5d..1b8b058 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -46,10 +45,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
 import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
-import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
-import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
-import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -61,9 +56,7 @@ import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
index 0d31e5f..44f2e4d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
@@ -26,7 +26,6 @@ import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
@@ -48,8 +47,6 @@ import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -213,9 +210,31 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
   }
 
+  /**
+   * TODO: Instead of creating a dummy iterator per row, we can implement a private method that's
+   * similar to processRow(Object key, Iterator<E> values) but processes one row at a time. Then,
+   * we just call that private method here.
+   */
   @Override
-  public void processRow(Object key, Object value) throws IOException {
-    throw new UnsupportedOperationException("Do not support this method in SparkReduceRecordHandler.");
+  public void processRow(Object key, final Object value) throws IOException {
+    processRow(key, new Iterator<Object>() {
+      boolean done = false;
+      @Override
+      public boolean hasNext() {
+        return !done;
+      }
+
+      @Override
+      public Object next() {
+        done = true;
+        return value;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException("Iterator.remove() is not implemented/supported");
+      }
+    });
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java
index 40e251f..d71d940 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java
@@ -24,7 +24,7 @@ import org.apache.spark.api.java.JavaPairRDD;
 
 public interface SparkShuffler {
 
-  JavaPairRDD<HiveKey, Iterable<BytesWritable>> shuffle(
+  JavaPairRDD<HiveKey, BytesWritable> shuffle(
       JavaPairRDD<HiveKey, BytesWritable> input, int numPartitions);
 
   public String getName();

http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/test/queries/clientpositive/union_top_level.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/union_top_level.q b/ql/src/test/queries/clientpositive/union_top_level.q
index d93fe38..2050442 100644
--- a/ql/src/test/queries/clientpositive/union_top_level.q
+++ b/ql/src/test/queries/clientpositive/union_top_level.q
@@ -16,13 +16,13 @@ union all
 select * from (select key, 2 as value from src where key % 3 == 2 limit 3)c;
 
 explain
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
 union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b;
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b;
 
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
 union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b;
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b;
 
 -- ctas
 explain

http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/test/results/clientpositive/llap/union_top_level.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/union_top_level.q.out b/ql/src/test/results/clientpositive/llap/union_top_level.q.out
index b48ab83..11b3b8f 100644
--- a/ql/src/test/results/clientpositive/llap/union_top_level.q.out
+++ b/ql/src/test/results/clientpositive/llap/union_top_level.q.out
@@ -190,14 +190,14 @@ POSTHOOK: Input: default@src
 484	1
 86	2
 PREHOOK: query: explain
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
 union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b
 PREHOOK: type: QUERY
 POSTHOOK: query: explain
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
 union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
@@ -307,21 +307,19 @@ STAGE PLANS:
                   expressions: _col2 (type: string), _col1 (type: string)
                   outputColumnNames: _col0, _col1
                   Statistics: Num rows: 1219 Data size: 216982 Basic stats: COMPLETE Column stats: COMPLETE
-                  Limit
-                    Number of rows: 10
-                    Statistics: Num rows: 10 Data size: 1780 Basic stats: COMPLETE Column stats: COMPLETE
-                    Reduce Output Operator
-                      sort order: 
-                      Statistics: Num rows: 10 Data size: 1780 Basic stats: COMPLETE Column stats: COMPLETE
-                      TopN Hash Memory Usage: 0.1
-                      value expressions: _col0 (type: string), _col1 (type: string)
+                  Reduce Output Operator
+                    key expressions: _col0 (type: string)
+                    sort order: +
+                    Statistics: Num rows: 1219 Data size: 216982 Basic stats: COMPLETE Column stats: COMPLETE
+                    TopN Hash Memory Usage: 0.1
+                    value expressions: _col1 (type: string)
         Reducer 3 
             Execution mode: llap
             Reduce Operator Tree:
               Select Operator
-                expressions: VALUE._col0 (type: string), VALUE._col1 (type: string)
+                expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string)
                 outputColumnNames: _col0, _col1
-                Statistics: Num rows: 10 Data size: 1780 Basic stats: COMPLETE Column stats: COMPLETE
+                Statistics: Num rows: 1219 Data size: 216982 Basic stats: COMPLETE Column stats: COMPLETE
                 Limit
                   Number of rows: 10
                   Statistics: Num rows: 10 Data size: 1780 Basic stats: COMPLETE Column stats: COMPLETE
@@ -347,21 +345,19 @@ STAGE PLANS:
                   expressions: _col2 (type: string), _col1 (type: string)
                   outputColumnNames: _col0, _col1
                   Statistics: Num rows: 1219 Data size: 216982 Basic stats: COMPLETE Column stats: COMPLETE
-                  Limit
-                    Number of rows: 10
-                    Statistics: Num rows: 10 Data size: 1780 Basic stats: COMPLETE Column stats: COMPLETE
-                    Reduce Output Operator
-                      sort order: 
-                      Statistics: Num rows: 10 Data size: 1780 Basic stats: COMPLETE Column stats: COMPLETE
-                      TopN Hash Memory Usage: 0.1
-                      value expressions: _col0 (type: string), _col1 (type: string)
+                  Reduce Output Operator
+                    key expressions: _col0 (type: string)
+                    sort order: +
+                    Statistics: Num rows: 1219 Data size: 216982 Basic stats: COMPLETE Column stats: COMPLETE
+                    TopN Hash Memory Usage: 0.1
+                    value expressions: _col1 (type: string)
         Reducer 8 
             Execution mode: llap
             Reduce Operator Tree:
               Select Operator
-                expressions: VALUE._col0 (type: string), VALUE._col1 (type: string)
+                expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string)
                 outputColumnNames: _col0, _col1
-                Statistics: Num rows: 10 Data size: 1780 Basic stats: COMPLETE Column stats: COMPLETE
+                Statistics: Num rows: 1219 Data size: 216982 Basic stats: COMPLETE Column stats: COMPLETE
                 Limit
                   Number of rows: 10
                   Statistics: Num rows: 10 Data size: 1780 Basic stats: COMPLETE Column stats: COMPLETE
@@ -381,15 +377,15 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-PREHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+PREHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
 union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src
 #### A masked pattern was here ####
-POSTHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+POSTHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
 union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
 #### A masked pattern was here ####

http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out b/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out
index 65a6e3e..42e29ac 100644
--- a/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out
+++ b/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out
@@ -95,9 +95,9 @@ POSTHOOK: query: SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3))
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
 #### A masked pattern was here ####
-3	3
 1	1
 2	2
+3	3
 PREHOOK: query: DROP TEMPORARY FUNCTION explode2
 PREHOOK: type: DROPFUNCTION
 PREHOOK: Output: explode2

http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/test/results/clientpositive/spark/union_remove_25.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/union_remove_25.q.out b/ql/src/test/results/clientpositive/spark/union_remove_25.q.out
index 9fec1d4..e7844ed 100644
--- a/ql/src/test/results/clientpositive/spark/union_remove_25.q.out
+++ b/ql/src/test/results/clientpositive/spark/union_remove_25.q.out
@@ -370,7 +370,7 @@ Table:              	outputtbl2
 #### A masked pattern was here ####
 Partition Parameters:	 	 
 	numFiles            	2                   
-	totalSize           	6826                
+	totalSize           	6812                
 #### A masked pattern was here ####
 	 	 
 # Storage Information	 	 

http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/test/results/clientpositive/spark/union_top_level.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/union_top_level.q.out b/ql/src/test/results/clientpositive/spark/union_top_level.q.out
index c9cb5d3..0b4ed79 100644
--- a/ql/src/test/results/clientpositive/spark/union_top_level.q.out
+++ b/ql/src/test/results/clientpositive/spark/union_top_level.q.out
@@ -178,14 +178,14 @@ POSTHOOK: Input: default@src
 484	1
 86	2
 PREHOOK: query: explain
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
 union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b
 PREHOOK: type: QUERY
 POSTHOOK: query: explain
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
 union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
@@ -196,8 +196,8 @@ STAGE PLANS:
     Spark
       Edges:
         Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL SORT, 2)
-        Reducer 3 <- Reducer 2 (GROUP, 1)
-        Reducer 7 <- Reducer 2 (GROUP, 1)
+        Reducer 3 <- Reducer 2 (SORT, 1)
+        Reducer 7 <- Reducer 2 (SORT, 1)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -249,20 +249,18 @@ STAGE PLANS:
                   expressions: _col2 (type: string), _col1 (type: string)
                   outputColumnNames: _col0, _col1
                   Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
-                  Limit
-                    Number of rows: 10
-                    Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
-                    Reduce Output Operator
-                      sort order: 
-                      Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
-                      TopN Hash Memory Usage: 0.1
-                      value expressions: _col0 (type: string), _col1 (type: string)
+                  Reduce Output Operator
+                    key expressions: _col0 (type: string)
+                    sort order: +
+                    Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
+                    TopN Hash Memory Usage: 0.1
+                    value expressions: _col1 (type: string)
         Reducer 3 
             Reduce Operator Tree:
               Select Operator
-                expressions: VALUE._col0 (type: string), VALUE._col1 (type: string)
+                expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string)
                 outputColumnNames: _col0, _col1
-                Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
+                Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
                 Limit
                   Number of rows: 10
                   Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
@@ -276,9 +274,9 @@ STAGE PLANS:
         Reducer 7 
             Reduce Operator Tree:
               Select Operator
-                expressions: VALUE._col0 (type: string), VALUE._col1 (type: string)
+                expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string)
                 outputColumnNames: _col0, _col1
-                Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
+                Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
                 Limit
                   Number of rows: 10
                   Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
@@ -296,15 +294,15 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-PREHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+PREHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
 union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src
 #### A masked pattern was here ####
-POSTHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+POSTHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
 union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
 #### A masked pattern was here ####
@@ -318,16 +316,16 @@ POSTHOOK: Input: default@src
 0	val_0
 0	val_0
 0	val_0
-100	val_100
-100	val_100
-100	val_100
-100	val_100
-100	val_100
-100	val_100
-100	val_100
-100	val_100
-104	val_104
-104	val_104
+0	val_0
+0	val_0
+0	val_0
+0	val_0
+0	val_0
+0	val_0
+0	val_0
+0	val_0
+10	val_10
+10	val_10
 PREHOOK: query: explain
 create table union_top as
 select * from (select key, 0 as value from src where key % 3 == 0 limit 3)a

http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/test/results/clientpositive/spark/vector_outer_join5.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/vector_outer_join5.q.out b/ql/src/test/results/clientpositive/spark/vector_outer_join5.q.out
index 9e1742f..55e9287 100644
--- a/ql/src/test/results/clientpositive/spark/vector_outer_join5.q.out
+++ b/ql/src/test/results/clientpositive/spark/vector_outer_join5.q.out
@@ -90,11 +90,11 @@ STAGE PLANS:
             Map Operator Tree:
                 TableScan
                   alias: st
-                  Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     expressions: ctinyint (type: tinyint)
                     outputColumnNames: _col0
-                    Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+                    Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
                     Spark HashTable Sink Operator
                       keys:
                         0 _col0 (type: tinyint)
@@ -208,11 +208,11 @@ STAGE PLANS:
             Map Operator Tree:
                 TableScan
                   alias: sm
-                  Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     expressions: ctinyint (type: tinyint)
                     outputColumnNames: _col0
-                    Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+                    Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
                     Spark HashTable Sink Operator
                       filter predicates:
                         0 {(_col1 = 2)}
@@ -332,11 +332,11 @@ STAGE PLANS:
             Map Operator Tree:
                 TableScan
                   alias: sm
-                  Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     expressions: ctinyint (type: tinyint)
                     outputColumnNames: _col0
-                    Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+                    Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
                     Spark HashTable Sink Operator
                       filter predicates:
                         0 {((UDFToInteger(_col0) pmod 4) = _col1)}
@@ -456,11 +456,11 @@ STAGE PLANS:
             Map Operator Tree:
                 TableScan
                   alias: sm
-                  Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     expressions: ctinyint (type: tinyint)
                     outputColumnNames: _col0
-                    Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+                    Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
                     Spark HashTable Sink Operator
                       filter predicates:
                         0 {(_col0 < 100)}
@@ -584,11 +584,11 @@ STAGE PLANS:
             Map Operator Tree:
                 TableScan
                   alias: sm
-                  Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     expressions: cbigint (type: bigint)
                     outputColumnNames: _col0
-                    Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+                    Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
                     Spark HashTable Sink Operator
                       keys:
                         0 UDFToLong(_col1) (type: bigint)
@@ -794,11 +794,11 @@ STAGE PLANS:
             Map Operator Tree:
                 TableScan
                   alias: st
-                  Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     expressions: cmodtinyint (type: int)
                     outputColumnNames: _col0
-                    Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+                    Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
                     Spark HashTable Sink Operator
                       keys:
                         0 _col0 (type: int)
@@ -912,11 +912,11 @@ STAGE PLANS:
             Map Operator Tree:
                 TableScan
                   alias: sm
-                  Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     expressions: cmodtinyint (type: int)
                     outputColumnNames: _col0
-                    Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+                    Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
                     Spark HashTable Sink Operator
                       filter predicates:
                         0 {(_col1 = 2)}
@@ -1036,11 +1036,11 @@ STAGE PLANS:
             Map Operator Tree:
                 TableScan
                   alias: sm
-                  Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     expressions: cmodtinyint (type: int)
                     outputColumnNames: _col0
-                    Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+                    Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
                     Spark HashTable Sink Operator
                       filter predicates:
                         0 {((_col0 pmod 4) = _col1)}
@@ -1160,11 +1160,11 @@ STAGE PLANS:
             Map Operator Tree:
                 TableScan
                   alias: sm
-                  Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     expressions: cmodtinyint (type: int)
                     outputColumnNames: _col0
-                    Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+                    Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
                     Spark HashTable Sink Operator
                       filter predicates:
                         0 {(_col0 < 3)}
@@ -1288,11 +1288,11 @@ STAGE PLANS:
             Map Operator Tree:
                 TableScan
                   alias: sm
-                  Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     expressions: cbigint (type: bigint)
                     outputColumnNames: _col0
-                    Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+                    Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
                     Spark HashTable Sink Operator
                       keys:
                         0 UDFToLong(_col1) (type: bigint)