You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2017/01/20 20:57:01 UTC
hive git commit: HIVE-15580: Eliminate unbounded memory usage for
orderBy and groupBy in Hive on Spark (reviewed by Chao Sun)
Repository: hive
Updated Branches:
refs/heads/master f968cf78a -> 811b3e39e
HIVE-15580: Eliminate unbounded memory usage for orderBy and groupBy in Hive on Spark (reviewed by Chao Sun)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/811b3e39
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/811b3e39
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/811b3e39
Branch: refs/heads/master
Commit: 811b3e39ed569232c4f138c1287109ef8ebce132
Parents: f968cf7
Author: Xuefu Zhang <xu...@uber.com>
Authored: Fri Jan 20 12:56:49 2017 -0800
Committer: Xuefu Zhang <xu...@uber.com>
Committed: Fri Jan 20 12:56:49 2017 -0800
----------------------------------------------------------------------
.../hive/ql/exec/spark/GroupByShuffler.java | 10 +--
.../hive/ql/exec/spark/HiveReduceFunction.java | 4 +-
.../spark/HiveReduceFunctionResultList.java | 8 +--
.../hadoop/hive/ql/exec/spark/ReduceTran.java | 4 +-
.../hadoop/hive/ql/exec/spark/ShuffleTran.java | 6 +-
.../hive/ql/exec/spark/SortByShuffler.java | 65 +-------------------
.../hive/ql/exec/spark/SparkPlanGenerator.java | 7 ---
.../ql/exec/spark/SparkReduceRecordHandler.java | 29 +++++++--
.../hive/ql/exec/spark/SparkShuffler.java | 2 +-
.../queries/clientpositive/union_top_level.q | 8 +--
.../clientpositive/llap/union_top_level.q.out | 52 ++++++++--------
.../spark/lateral_view_explode2.q.out | 2 +-
.../clientpositive/spark/union_remove_25.q.out | 2 +-
.../clientpositive/spark/union_top_level.q.out | 62 +++++++++----------
.../spark/vector_outer_join5.q.out | 40 ++++++------
15 files changed, 124 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java
index e128dd2..8267515 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java
@@ -20,21 +20,23 @@ package org.apache.hadoop.hive.ql.exec.spark;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
public class GroupByShuffler implements SparkShuffler {
@Override
- public JavaPairRDD<HiveKey, Iterable<BytesWritable>> shuffle(
+ public JavaPairRDD<HiveKey, BytesWritable> shuffle(
JavaPairRDD<HiveKey, BytesWritable> input, int numPartitions) {
- if (numPartitions > 0) {
- return input.groupByKey(numPartitions);
+ if (numPartitions < 0) {
+ numPartitions = 1;
}
- return input.groupByKey();
+ return input.repartitionAndSortWithinPartitions(new HashPartitioner(numPartitions));
}
@Override
public String getName() {
return "GroupBy";
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
index eeb4443..2b85872 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.io.BytesWritable;
import scala.Tuple2;
public class HiveReduceFunction extends HivePairFlatMapFunction<
- Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>>, HiveKey, BytesWritable> {
+ Iterator<Tuple2<HiveKey, BytesWritable>>, HiveKey, BytesWritable> {
private static final long serialVersionUID = 1L;
@@ -37,7 +37,7 @@ public class HiveReduceFunction extends HivePairFlatMapFunction<
@SuppressWarnings("unchecked")
@Override
public Iterator<Tuple2<HiveKey, BytesWritable>>
- call(Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> it) throws Exception {
+ call(Iterator<Tuple2<HiveKey, BytesWritable>> it) throws Exception {
initJobConf();
SparkReduceRecordHandler reducerRecordhandler = new SparkReduceRecordHandler();
http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
index d57cac4..8708819 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.io.BytesWritable;
import scala.Tuple2;
public class HiveReduceFunctionResultList extends
- HiveBaseFunctionResultList<Tuple2<HiveKey, Iterable<BytesWritable>>> {
+ HiveBaseFunctionResultList<Tuple2<HiveKey, BytesWritable>> {
private static final long serialVersionUID = 1L;
private final SparkReduceRecordHandler reduceRecordHandler;
@@ -37,16 +37,16 @@ public class HiveReduceFunctionResultList extends
* @param reducer Initialized {@link org.apache.hadoop.hive.ql.exec.mr.ExecReducer} instance.
*/
public HiveReduceFunctionResultList(
- Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> inputIterator,
+ Iterator<Tuple2<HiveKey, BytesWritable>> inputIterator,
SparkReduceRecordHandler reducer) {
super(inputIterator);
this.reduceRecordHandler = reducer;
}
@Override
- protected void processNextRecord(Tuple2<HiveKey, Iterable<BytesWritable>> inputRecord)
+ protected void processNextRecord(Tuple2<HiveKey, BytesWritable> inputRecord)
throws IOException {
- reduceRecordHandler.processRow(inputRecord._1(), inputRecord._2().iterator());
+ reduceRecordHandler.processRow(inputRecord._1(), inputRecord._2());
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
index 3d56876..926e1f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaPairRDD;
-public class ReduceTran extends CacheTran<HiveKey, Iterable<BytesWritable>, HiveKey, BytesWritable> {
+public class ReduceTran extends CacheTran<HiveKey, BytesWritable, HiveKey, BytesWritable> {
private HiveReduceFunction reduceFunc;
private String name = "Reduce";
@@ -36,7 +36,7 @@ public class ReduceTran extends CacheTran<HiveKey, Iterable<BytesWritable>, Hive
@Override
public JavaPairRDD<HiveKey, BytesWritable> doTransform(
- JavaPairRDD<HiveKey, Iterable<BytesWritable>> input) {
+ JavaPairRDD<HiveKey, BytesWritable> input) {
return input.mapPartitionsToPair(reduceFunc);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
index a774395..2aac2b4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.storage.StorageLevel;
-public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, Iterable<BytesWritable>> {
+public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, BytesWritable> {
private final SparkShuffler shuffler;
private final int numOfPartitions;
private final boolean toCache;
@@ -42,8 +42,8 @@ public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, I
}
@Override
- public JavaPairRDD<HiveKey, Iterable<BytesWritable>> transform(JavaPairRDD<HiveKey, BytesWritable> input) {
- JavaPairRDD<HiveKey, Iterable<BytesWritable>> result = shuffler.shuffle(input, numOfPartitions);
+ public JavaPairRDD<HiveKey, BytesWritable> transform(JavaPairRDD<HiveKey, BytesWritable> input) {
+ JavaPairRDD<HiveKey, BytesWritable> result = shuffler.shuffle(input, numOfPartitions);
if (toCache) {
sparkPlan.addCachedRDDId(result.id());
result = result.persist(StorageLevel.MEMORY_AND_DISK());
http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
index 997ab7e..b7ab5e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
@@ -23,11 +23,7 @@ import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.storage.StorageLevel;
-import scala.Tuple2;
-
-import java.util.*;
public class SortByShuffler implements SparkShuffler {
@@ -43,7 +39,7 @@ public class SortByShuffler implements SparkShuffler {
}
@Override
- public JavaPairRDD<HiveKey, Iterable<BytesWritable>> shuffle(
+ public JavaPairRDD<HiveKey, BytesWritable> shuffle(
JavaPairRDD<HiveKey, BytesWritable> input, int numPartitions) {
JavaPairRDD<HiveKey, BytesWritable> rdd;
if (totalOrder) {
@@ -60,7 +56,7 @@ public class SortByShuffler implements SparkShuffler {
Partitioner partitioner = new HashPartitioner(numPartitions);
rdd = input.repartitionAndSortWithinPartitions(partitioner);
}
- return rdd.mapPartitionsToPair(new ShuffleFunction());
+ return rdd;
}
@Override
@@ -68,61 +64,4 @@ public class SortByShuffler implements SparkShuffler {
return "SortBy";
}
- private static class ShuffleFunction implements
- PairFlatMapFunction<Iterator<Tuple2<HiveKey, BytesWritable>>,
- HiveKey, Iterable<BytesWritable>> {
- // make eclipse happy
- private static final long serialVersionUID = 1L;
-
- @Override
- public Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> call(
- final Iterator<Tuple2<HiveKey, BytesWritable>> it) throws Exception {
- // Use input iterator to back returned iterable object.
- return new Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>>() {
- HiveKey curKey = null;
- List<BytesWritable> curValues = new ArrayList<BytesWritable>();
-
- @Override
- public boolean hasNext() {
- return it.hasNext() || curKey != null;
- }
-
- @Override
- public Tuple2<HiveKey, Iterable<BytesWritable>> next() {
- // TODO: implement this by accumulating rows with the same key into a list.
- // Note that this list needs to improved to prevent excessive memory usage, but this
- // can be done in later phase.
- while (it.hasNext()) {
- Tuple2<HiveKey, BytesWritable> pair = it.next();
- if (curKey != null && !curKey.equals(pair._1())) {
- HiveKey key = curKey;
- List<BytesWritable> values = curValues;
- curKey = pair._1();
- curValues = new ArrayList<BytesWritable>();
- curValues.add(pair._2());
- return new Tuple2<HiveKey, Iterable<BytesWritable>>(key, values);
- }
- curKey = pair._1();
- curValues.add(pair._2());
- }
- if (curKey == null) {
- throw new NoSuchElementException();
- }
- // if we get here, this should be the last element we have
- HiveKey key = curKey;
- curKey = null;
- return new Tuple2<HiveKey, Iterable<BytesWritable>>(key, curValues);
- }
-
- @Override
- public void remove() {
- // Not implemented.
- // throw Unsupported Method Invocation Exception.
- throw new UnsupportedOperationException();
- }
-
- };
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
index 66ffe5d..1b8b058 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -46,10 +45,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
-import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
-import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
-import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -61,9 +56,7 @@ import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
index 0d31e5f..44f2e4d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
@@ -26,7 +26,6 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
@@ -48,8 +47,6 @@ import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -213,9 +210,31 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
}
+ /**
+ * TODO: Instead of creating a dummy iterator per row, we can implement a private method that's
+ * similar to processRow(Object key, Iterator<E> values) but processes one row at a time. Then,
+ * we just call that private method here.
+ */
@Override
- public void processRow(Object key, Object value) throws IOException {
- throw new UnsupportedOperationException("Do not support this method in SparkReduceRecordHandler.");
+ public void processRow(Object key, final Object value) throws IOException {
+ processRow(key, new Iterator<Object>() {
+ boolean done = false;
+ @Override
+ public boolean hasNext() {
+ return !done;
+ }
+
+ @Override
+ public Object next() {
+ done = true;
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Iterator.remove() is not implemented/supported");
+ }
+ });
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java
index 40e251f..d71d940 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java
@@ -24,7 +24,7 @@ import org.apache.spark.api.java.JavaPairRDD;
public interface SparkShuffler {
- JavaPairRDD<HiveKey, Iterable<BytesWritable>> shuffle(
+ JavaPairRDD<HiveKey, BytesWritable> shuffle(
JavaPairRDD<HiveKey, BytesWritable> input, int numPartitions);
public String getName();
http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/test/queries/clientpositive/union_top_level.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/union_top_level.q b/ql/src/test/queries/clientpositive/union_top_level.q
index d93fe38..2050442 100644
--- a/ql/src/test/queries/clientpositive/union_top_level.q
+++ b/ql/src/test/queries/clientpositive/union_top_level.q
@@ -16,13 +16,13 @@ union all
select * from (select key, 2 as value from src where key % 3 == 2 limit 3)c;
explain
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b;
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b;
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b;
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b;
-- ctas
explain
http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/test/results/clientpositive/llap/union_top_level.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/union_top_level.q.out b/ql/src/test/results/clientpositive/llap/union_top_level.q.out
index b48ab83..11b3b8f 100644
--- a/ql/src/test/results/clientpositive/llap/union_top_level.q.out
+++ b/ql/src/test/results/clientpositive/llap/union_top_level.q.out
@@ -190,14 +190,14 @@ POSTHOOK: Input: default@src
484 1
86 2
PREHOOK: query: explain
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b
PREHOOK: type: QUERY
POSTHOOK: query: explain
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
@@ -307,21 +307,19 @@ STAGE PLANS:
expressions: _col2 (type: string), _col1 (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 1219 Data size: 216982 Basic stats: COMPLETE Column stats: COMPLETE
- Limit
- Number of rows: 10
- Statistics: Num rows: 10 Data size: 1780 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- sort order:
- Statistics: Num rows: 10 Data size: 1780 Basic stats: COMPLETE Column stats: COMPLETE
- TopN Hash Memory Usage: 0.1
- value expressions: _col0 (type: string), _col1 (type: string)
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Statistics: Num rows: 1219 Data size: 216982 Basic stats: COMPLETE Column stats: COMPLETE
+ TopN Hash Memory Usage: 0.1
+ value expressions: _col1 (type: string)
Reducer 3
Execution mode: llap
Reduce Operator Tree:
Select Operator
- expressions: VALUE._col0 (type: string), VALUE._col1 (type: string)
+ expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string)
outputColumnNames: _col0, _col1
- Statistics: Num rows: 10 Data size: 1780 Basic stats: COMPLETE Column stats: COMPLETE
+ Statistics: Num rows: 1219 Data size: 216982 Basic stats: COMPLETE Column stats: COMPLETE
Limit
Number of rows: 10
Statistics: Num rows: 10 Data size: 1780 Basic stats: COMPLETE Column stats: COMPLETE
@@ -347,21 +345,19 @@ STAGE PLANS:
expressions: _col2 (type: string), _col1 (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 1219 Data size: 216982 Basic stats: COMPLETE Column stats: COMPLETE
- Limit
- Number of rows: 10
- Statistics: Num rows: 10 Data size: 1780 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- sort order:
- Statistics: Num rows: 10 Data size: 1780 Basic stats: COMPLETE Column stats: COMPLETE
- TopN Hash Memory Usage: 0.1
- value expressions: _col0 (type: string), _col1 (type: string)
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Statistics: Num rows: 1219 Data size: 216982 Basic stats: COMPLETE Column stats: COMPLETE
+ TopN Hash Memory Usage: 0.1
+ value expressions: _col1 (type: string)
Reducer 8
Execution mode: llap
Reduce Operator Tree:
Select Operator
- expressions: VALUE._col0 (type: string), VALUE._col1 (type: string)
+ expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string)
outputColumnNames: _col0, _col1
- Statistics: Num rows: 10 Data size: 1780 Basic stats: COMPLETE Column stats: COMPLETE
+ Statistics: Num rows: 1219 Data size: 216982 Basic stats: COMPLETE Column stats: COMPLETE
Limit
Number of rows: 10
Statistics: Num rows: 10 Data size: 1780 Basic stats: COMPLETE Column stats: COMPLETE
@@ -381,15 +377,15 @@ STAGE PLANS:
Processor Tree:
ListSink
-PREHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+PREHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b
PREHOOK: type: QUERY
PREHOOK: Input: default@src
#### A masked pattern was here ####
-POSTHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+POSTHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b
POSTHOOK: type: QUERY
POSTHOOK: Input: default@src
#### A masked pattern was here ####
http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out b/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out
index 65a6e3e..42e29ac 100644
--- a/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out
+++ b/ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out
@@ -95,9 +95,9 @@ POSTHOOK: query: SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3))
POSTHOOK: type: QUERY
POSTHOOK: Input: default@src
#### A masked pattern was here ####
-3 3
1 1
2 2
+3 3
PREHOOK: query: DROP TEMPORARY FUNCTION explode2
PREHOOK: type: DROPFUNCTION
PREHOOK: Output: explode2
http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/test/results/clientpositive/spark/union_remove_25.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/union_remove_25.q.out b/ql/src/test/results/clientpositive/spark/union_remove_25.q.out
index 9fec1d4..e7844ed 100644
--- a/ql/src/test/results/clientpositive/spark/union_remove_25.q.out
+++ b/ql/src/test/results/clientpositive/spark/union_remove_25.q.out
@@ -370,7 +370,7 @@ Table: outputtbl2
#### A masked pattern was here ####
Partition Parameters:
numFiles 2
- totalSize 6826
+ totalSize 6812
#### A masked pattern was here ####
# Storage Information
http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/test/results/clientpositive/spark/union_top_level.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/union_top_level.q.out b/ql/src/test/results/clientpositive/spark/union_top_level.q.out
index c9cb5d3..0b4ed79 100644
--- a/ql/src/test/results/clientpositive/spark/union_top_level.q.out
+++ b/ql/src/test/results/clientpositive/spark/union_top_level.q.out
@@ -178,14 +178,14 @@ POSTHOOK: Input: default@src
484 1
86 2
PREHOOK: query: explain
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b
PREHOOK: type: QUERY
POSTHOOK: query: explain
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
@@ -196,8 +196,8 @@ STAGE PLANS:
Spark
Edges:
Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL SORT, 2)
- Reducer 3 <- Reducer 2 (GROUP, 1)
- Reducer 7 <- Reducer 2 (GROUP, 1)
+ Reducer 3 <- Reducer 2 (SORT, 1)
+ Reducer 7 <- Reducer 2 (SORT, 1)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -249,20 +249,18 @@ STAGE PLANS:
expressions: _col2 (type: string), _col1 (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
- Limit
- Number of rows: 10
- Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
- Reduce Output Operator
- sort order:
- Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
- TopN Hash Memory Usage: 0.1
- value expressions: _col0 (type: string), _col1 (type: string)
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
+ TopN Hash Memory Usage: 0.1
+ value expressions: _col1 (type: string)
Reducer 3
Reduce Operator Tree:
Select Operator
- expressions: VALUE._col0 (type: string), VALUE._col1 (type: string)
+ expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string)
outputColumnNames: _col0, _col1
- Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
Limit
Number of rows: 10
Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
@@ -276,9 +274,9 @@ STAGE PLANS:
Reducer 7
Reduce Operator Tree:
Select Operator
- expressions: VALUE._col0 (type: string), VALUE._col1 (type: string)
+ expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string)
outputColumnNames: _col0, _col1
- Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
Limit
Number of rows: 10
Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
@@ -296,15 +294,15 @@ STAGE PLANS:
Processor Tree:
ListSink
-PREHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+PREHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b
PREHOOK: type: QUERY
PREHOOK: Input: default@src
#### A masked pattern was here ####
-POSTHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a
+POSTHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a
union all
-select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b
+select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b
POSTHOOK: type: QUERY
POSTHOOK: Input: default@src
#### A masked pattern was here ####
@@ -318,16 +316,16 @@ POSTHOOK: Input: default@src
0 val_0
0 val_0
0 val_0
-100 val_100
-100 val_100
-100 val_100
-100 val_100
-100 val_100
-100 val_100
-100 val_100
-100 val_100
-104 val_104
-104 val_104
+0 val_0
+0 val_0
+0 val_0
+0 val_0
+0 val_0
+0 val_0
+0 val_0
+0 val_0
+10 val_10
+10 val_10
PREHOOK: query: explain
create table union_top as
select * from (select key, 0 as value from src where key % 3 == 0 limit 3)a
http://git-wip-us.apache.org/repos/asf/hive/blob/811b3e39/ql/src/test/results/clientpositive/spark/vector_outer_join5.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/vector_outer_join5.q.out b/ql/src/test/results/clientpositive/spark/vector_outer_join5.q.out
index 9e1742f..55e9287 100644
--- a/ql/src/test/results/clientpositive/spark/vector_outer_join5.q.out
+++ b/ql/src/test/results/clientpositive/spark/vector_outer_join5.q.out
@@ -90,11 +90,11 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: st
- Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: ctinyint (type: tinyint)
outputColumnNames: _col0
- Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
Spark HashTable Sink Operator
keys:
0 _col0 (type: tinyint)
@@ -208,11 +208,11 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: sm
- Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: ctinyint (type: tinyint)
outputColumnNames: _col0
- Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
Spark HashTable Sink Operator
filter predicates:
0 {(_col1 = 2)}
@@ -332,11 +332,11 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: sm
- Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: ctinyint (type: tinyint)
outputColumnNames: _col0
- Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
Spark HashTable Sink Operator
filter predicates:
0 {((UDFToInteger(_col0) pmod 4) = _col1)}
@@ -456,11 +456,11 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: sm
- Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: ctinyint (type: tinyint)
outputColumnNames: _col0
- Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
Spark HashTable Sink Operator
filter predicates:
0 {(_col0 < 100)}
@@ -584,11 +584,11 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: sm
- Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: cbigint (type: bigint)
outputColumnNames: _col0
- Statistics: Num rows: 100 Data size: 380 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 379 Basic stats: COMPLETE Column stats: NONE
Spark HashTable Sink Operator
keys:
0 UDFToLong(_col1) (type: bigint)
@@ -794,11 +794,11 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: st
- Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: cmodtinyint (type: int)
outputColumnNames: _col0
- Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
Spark HashTable Sink Operator
keys:
0 _col0 (type: int)
@@ -912,11 +912,11 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: sm
- Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: cmodtinyint (type: int)
outputColumnNames: _col0
- Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
Spark HashTable Sink Operator
filter predicates:
0 {(_col1 = 2)}
@@ -1036,11 +1036,11 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: sm
- Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: cmodtinyint (type: int)
outputColumnNames: _col0
- Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
Spark HashTable Sink Operator
filter predicates:
0 {((_col0 pmod 4) = _col1)}
@@ -1160,11 +1160,11 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: sm
- Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: cmodtinyint (type: int)
outputColumnNames: _col0
- Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
Spark HashTable Sink Operator
filter predicates:
0 {(_col0 < 3)}
@@ -1288,11 +1288,11 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: sm
- Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: cbigint (type: bigint)
outputColumnNames: _col0
- Statistics: Num rows: 100 Data size: 363 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 100 Data size: 362 Basic stats: COMPLETE Column stats: NONE
Spark HashTable Sink Operator
keys:
0 UDFToLong(_col1) (type: bigint)