You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2018/07/19 21:24:26 UTC
[13/13] hive git commit: HIVE-17896: TopNKey: Create a standalone
vectorizable TopNKey operator (Teddy Choi,
reviewed by Jesus Camacho Rodriguez)
HIVE-17896: TopNKey: Create a standalone vectorizable TopNKey operator (Teddy Choi, reviewed by Jesus Camacho Rodriguez)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cc294d32
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cc294d32
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cc294d32
Branch: refs/heads/branch-3
Commit: cc294d32fac3be22f5f26db814ffcc2824caedbf
Parents: c2a83ea
Author: Teddy Choi <pu...@gmail.com>
Authored: Thu Jul 19 13:55:57 2018 -0700
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Thu Jul 19 14:23:55 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +
.../test/resources/testconfiguration.properties | 6 +-
.../hadoop/hive/ql/plan/api/OperatorType.java | 5 +-
.../hadoop/hive/ql/exec/KeyWrapperFactory.java | 2 +-
.../hadoop/hive/ql/exec/OperatorFactory.java | 4 +
.../hadoop/hive/ql/exec/TopNKeyOperator.java | 214 +++
.../ql/exec/vector/VectorTopNKeyOperator.java | 304 +++
.../hive/ql/optimizer/TopNKeyProcessor.java | 109 ++
.../hive/ql/optimizer/physical/Vectorizer.java | 37 +
.../hadoop/hive/ql/parse/TezCompiler.java | 27 +
.../apache/hadoop/hive/ql/plan/TopNKeyDesc.java | 139 ++
.../hadoop/hive/ql/plan/VectorTopNKeyDesc.java | 39 +
ql/src/test/queries/clientpositive/topnkey.q | 31 +
.../queries/clientpositive/vector_topnkey.q | 30 +
.../clientpositive/llap/bucket_groupby.q.out | 274 +--
.../clientpositive/llap/check_constraint.q.out | 27 +-
.../clientpositive/llap/explainuser_1.q.out | 28 +-
.../clientpositive/llap/explainuser_2.q.out | 406 ++--
.../clientpositive/llap/limit_pushdown.q.out | 135 +-
.../clientpositive/llap/limit_pushdown3.q.out | 89 +-
.../llap/llap_decimal64_reader.q.out | 46 +-
.../clientpositive/llap/offset_limit.q.out | 27 +-
.../llap/offset_limit_ppd_optimizer.q.out | 85 +-
.../llap/orc_struct_type_vectorization.q.out | 53 +-
.../parquet_complex_types_vectorization.q.out | 159 +-
.../llap/parquet_map_type_vectorization.q.out | 53 +-
.../parquet_struct_type_vectorization.q.out | 53 +-
.../results/clientpositive/llap/topnkey.q.out | 318 ++++
.../llap/vector_cast_constant.q.out | 55 +-
.../clientpositive/llap/vector_char_2.q.out | 110 +-
.../vector_groupby_grouping_sets_limit.q.out | 346 ++--
.../llap/vector_groupby_reduce.q.out | 49 +-
.../llap/vector_mr_diff_schema_alias.q.out | 25 +-
.../llap/vector_reduce_groupby_decimal.q.out | 53 +-
.../llap/vector_string_concat.q.out | 47 +-
.../clientpositive/llap/vector_topnkey.q.out | 592 ++++++
.../llap/vectorization_limit.q.out | 63 +-
.../clientpositive/perf/tez/query10.q.out | 256 +--
.../clientpositive/perf/tez/query14.q.out | 1752 +++++++++---------
.../clientpositive/perf/tez/query15.q.out | 116 +-
.../clientpositive/perf/tez/query17.q.out | 222 +--
.../clientpositive/perf/tez/query25.q.out | 216 +--
.../clientpositive/perf/tez/query26.q.out | 138 +-
.../clientpositive/perf/tez/query27.q.out | 142 +-
.../clientpositive/perf/tez/query29.q.out | 220 +--
.../clientpositive/perf/tez/query35.q.out | 256 +--
.../clientpositive/perf/tez/query37.q.out | 98 +-
.../clientpositive/perf/tez/query40.q.out | 140 +-
.../clientpositive/perf/tez/query43.q.out | 84 +-
.../clientpositive/perf/tez/query45.q.out | 228 +--
.../clientpositive/perf/tez/query49.q.out | 412 ++--
.../clientpositive/perf/tez/query5.q.out | 360 ++--
.../clientpositive/perf/tez/query50.q.out | 140 +-
.../clientpositive/perf/tez/query60.q.out | 348 ++--
.../clientpositive/perf/tez/query66.q.out | 276 +--
.../clientpositive/perf/tez/query69.q.out | 274 +--
.../clientpositive/perf/tez/query7.q.out | 138 +-
.../clientpositive/perf/tez/query76.q.out | 224 +--
.../clientpositive/perf/tez/query77.q.out | 452 ++---
.../clientpositive/perf/tez/query8.q.out | 232 +--
.../clientpositive/perf/tez/query80.q.out | 492 ++---
.../clientpositive/perf/tez/query82.q.out | 98 +-
.../clientpositive/perf/tez/query99.q.out | 142 +-
.../results/clientpositive/tez/topnkey.q.out | 162 ++
.../clientpositive/tez/vector_topnkey.q.out | 162 ++
.../test/results/clientpositive/topnkey.q.out | 301 +++
.../results/clientpositive/vector_topnkey.q.out | 480 +++++
.../objectinspector/ObjectInspectorUtils.java | 19 +
68 files changed, 7996 insertions(+), 4626 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cc294d32/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 808ec66..fca1635 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2228,6 +2228,8 @@ public class HiveConf extends Configuration {
"If the skew information is correctly stored in the metadata, hive.optimize.skewjoin.compiletime\n" +
"would change the query plan to take care of it, and hive.optimize.skewjoin will be a no-op."),
+ HIVE_OPTIMIZE_TOPNKEY("hive.optimize.topnkey", true, "Whether to enable top n key optimizer."),
+
HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true,
"Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" +
"and follow-up operators in the query plan and merges them if they meet some preconditions. Tez only."),
http://git-wip-us.apache.org/repos/asf/hive/blob/cc294d32/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index a0037d8..ed99be5 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -44,9 +44,11 @@ disabled.query.files=ql_rewrite_gbtoidx.q,\
minitez.query.files.shared=delete_orig_table.q,\
orc_merge12.q,\
orc_vectorization_ppd.q,\
+ topnkey.q,\
update_orig_table.q,\
vector_join_part_col_char.q,\
- vector_non_string_partition.q
+ vector_non_string_partition.q,\
+ vector_topnkey.q
# NOTE: Add tests to minitez only if it is very
# specific to tez and cannot be added to minillap.
@@ -227,6 +229,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\
subquery_exists.q,\
subquery_in.q,\
temp_table.q,\
+ topnkey.q,\
union2.q,\
union3.q,\
union4.q,\
@@ -333,6 +336,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\
vector_reduce_groupby_duplicate_cols.q,\
vector_string_concat.q,\
vector_struct_in.q,\
+ vector_topnkey.q,\
vector_udf_character_length.q,\
vector_udf_octet_length.q,\
vector_varchar_4.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/cc294d32/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
index a002348..f8328be 100644
--- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
+++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
@@ -37,7 +37,8 @@ public enum OperatorType implements org.apache.thrift.TEnum {
ORCFILEMERGE(22),
RCFILEMERGE(23),
MERGEJOIN(24),
- SPARKPRUNINGSINK(25);
+ SPARKPRUNINGSINK(25),
+ TOPNKEY(26);
private final int value;
@@ -110,6 +111,8 @@ public enum OperatorType implements org.apache.thrift.TEnum {
return MERGEJOIN;
case 25:
return SPARKPRUNINGSINK;
+ case 26:
+ return TOPNKEY;
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc294d32/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java
index 71ee25d..f1bf902 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java
@@ -168,7 +168,6 @@ public class KeyWrapperFactory {
}
}
- transient Object[] singleEleArray = new Object[1];
transient StringObjectInspector soi_new, soi_copy;
class TextKeyWrapper extends KeyWrapper {
@@ -180,6 +179,7 @@ public class KeyWrapperFactory {
int hashcode;
Object key;
boolean isCopy;
+ transient Object[] singleEleArray = new Object[1];
public TextKeyWrapper(boolean isCopy) {
this(-1, null, isCopy);
http://git-wip-us.apache.org/repos/asf/hive/blob/cc294d32/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
index 7bb6590..b61d37e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSparkPartitionPruningSinkOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorTopNKeyOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkCommonOperator;
import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFOperator;
@@ -76,6 +77,7 @@ import org.apache.hadoop.hive.ql.plan.ScriptDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
import org.apache.hadoop.hive.ql.plan.UDTFDesc;
import org.apache.hadoop.hive.ql.plan.UnionDesc;
import org.apache.hadoop.hive.ql.plan.VectorDesc;
@@ -126,6 +128,7 @@ public final class OperatorFactory {
opvec.put(OrcFileMergeDesc.class, OrcFileMergeOperator.class);
opvec.put(CommonMergeJoinDesc.class, CommonMergeJoinOperator.class);
opvec.put(ListSinkDesc.class, ListSinkOperator.class);
+ opvec.put(TopNKeyDesc.class, TopNKeyOperator.class);
}
static {
@@ -143,6 +146,7 @@ public final class OperatorFactory {
vectorOpvec.put(LimitDesc.class, VectorLimitOperator.class);
vectorOpvec.put(PTFDesc.class, VectorPTFOperator.class);
vectorOpvec.put(SparkHashTableSinkDesc.class, VectorSparkHashTableSinkOperator.class);
+ vectorOpvec.put(TopNKeyDesc.class, VectorTopNKeyOperator.class);
}
public static <T extends OperatorDesc> Operator<T> getVectorOperator(
http://git-wip-us.apache.org/repos/asf/hive/blob/cc294d32/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
new file mode 100644
index 0000000..3dfeeaf
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+import static org.apache.hadoop.hive.ql.plan.api.OperatorType.TOPNKEY;
+
+/**
+ * TopNKeyOperator passes rows that contains top N keys only.
+ */
+public class TopNKeyOperator extends Operator<TopNKeyDesc> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // Maximum number of keys to hold
+ private transient int topN;
+
+ // Priority queue that holds occurred keys
+ private transient PriorityQueue<KeyWrapper> priorityQueue;
+
+ // Fast key wrapper in input format for fast comparison
+ private transient KeyWrapper keyWrapper;
+
+ // Standard key wrapper in standard format for output
+ private transient KeyWrapper standardKeyWrapper;
+
+ // Maximum number of rows
+ private transient int rowLimit;
+
+ // Current number of rows
+ private transient int rowSize;
+
+ // Rows
+ private transient Object[] rows;
+
+ /** Kryo ctor. */
+ public TopNKeyOperator() {
+ super();
+ }
+
+ public TopNKeyOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
+ public static class KeyWrapperComparator implements Comparator<KeyWrapper> {
+ private ObjectInspector[] objectInspectors1;
+ private ObjectInspector[] objectInspectors2;
+ private boolean[] columnSortOrderIsDesc;
+
+ public KeyWrapperComparator(ObjectInspector[] objectInspectors1, ObjectInspector[]
+ objectInspectors2, boolean[] columnSortOrderIsDesc) {
+ this.objectInspectors1 = objectInspectors1;
+ this.objectInspectors2 = objectInspectors2;
+ this.columnSortOrderIsDesc = columnSortOrderIsDesc;
+ }
+
+ @Override
+ public int compare(KeyWrapper key1, KeyWrapper key2) {
+ return ObjectInspectorUtils.compare(key1.getKeyArray(), objectInspectors1,
+ key2.getKeyArray(), objectInspectors2, columnSortOrderIsDesc);
+ }
+ }
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ super.initializeOp(hconf);
+
+ this.topN = conf.getTopN();
+
+ String columnSortOrder = conf.getColumnSortOrder();
+ boolean[] columnSortOrderIsDesc = new boolean[columnSortOrder.length()];
+ for (int i = 0; i < columnSortOrderIsDesc.length; i++) {
+ columnSortOrderIsDesc[i] = (columnSortOrder.charAt(i) == '-');
+ }
+
+ ObjectInspector rowInspector = inputObjInspectors[0];
+ outputObjInspector = ObjectInspectorUtils.getStandardObjectInspector(rowInspector);
+
+ // init keyFields
+ int numKeys = conf.getKeyColumns().size();
+ ExprNodeEvaluator[] keyFields = new ExprNodeEvaluator[numKeys];
+ ObjectInspector[] keyObjectInspectors = new ObjectInspector[numKeys];
+ ExprNodeEvaluator[] standardKeyFields = new ExprNodeEvaluator[numKeys];
+ ObjectInspector[] standardKeyObjectInspectors = new ObjectInspector[numKeys];
+
+ for (int i = 0; i < numKeys; i++) {
+ ExprNodeDesc key = conf.getKeyColumns().get(i);
+ keyFields[i] = ExprNodeEvaluatorFactory.get(key, hconf);
+ keyObjectInspectors[i] = keyFields[i].initialize(rowInspector);
+ standardKeyFields[i] = ExprNodeEvaluatorFactory.get(key, hconf);
+ standardKeyObjectInspectors[i] = standardKeyFields[i].initialize(outputObjInspector);
+ }
+
+ priorityQueue = new PriorityQueue<>(topN + 1, new TopNKeyOperator.KeyWrapperComparator(
+ standardKeyObjectInspectors, standardKeyObjectInspectors, columnSortOrderIsDesc));
+
+ keyWrapper = new KeyWrapperFactory(keyFields, keyObjectInspectors,
+ standardKeyObjectInspectors).getKeyWrapper();
+ standardKeyWrapper = new KeyWrapperFactory(standardKeyFields, standardKeyObjectInspectors,
+ standardKeyObjectInspectors).getKeyWrapper();
+
+ rowLimit = VectorizedRowBatch.DEFAULT_SIZE;
+ rows = new Object[rowLimit];
+ rowSize = 0;
+ }
+
+ @Override
+ public void process(Object row, int tag) throws HiveException {
+ keyWrapper.getNewKey(row, inputObjInspectors[0]);
+ keyWrapper.setHashKey();
+
+ if (!priorityQueue.contains(keyWrapper)) {
+ priorityQueue.offer(keyWrapper.copyKey());
+ }
+ if (priorityQueue.size() > topN) {
+ priorityQueue.poll();
+ }
+
+ rows[rowSize] = ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[0]);
+ rowSize++;
+
+ if (rowSize % rowLimit == 0) {
+ processRows();
+ }
+ }
+
+ private void processRows() throws HiveException {
+ for (int i = 0; i < rowSize; i++) {
+ Object row = rows[i];
+
+ standardKeyWrapper.getNewKey(row, outputObjInspector);
+ standardKeyWrapper.setHashKey();
+
+ if (priorityQueue.contains(standardKeyWrapper)) {
+ forward(row, outputObjInspector);
+ }
+ }
+ priorityQueue.clear();
+ rowSize = 0;
+ }
+
+ @Override
+ protected final void closeOp(boolean abort) throws HiveException {
+ processRows();
+ super.closeOp(abort);
+ }
+
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ static public String getOperatorName() {
+ return "TNK";
+ }
+
+ @Override
+ public OperatorType getType() {
+ return TOPNKEY;
+ }
+
+ // Because a TopNKeyOperator works like a FilterOperator with top n key condition, its properties
+ // for optimizers has same values. Following methods are same with FilterOperator;
+ // supportSkewJoinOptimization, columnNamesRowResolvedCanBeObtained,
+ // supportAutomaticSortMergeJoin, and supportUnionRemoveOptimization.
+ @Override
+ public boolean supportSkewJoinOptimization() {
+ return true;
+ }
+
+ @Override
+ public boolean columnNamesRowResolvedCanBeObtained() {
+ return true;
+ }
+
+ @Override
+ public boolean supportAutomaticSortMergeJoin() {
+ return true;
+ }
+
+ @Override
+ public boolean supportUnionRemoveOptimization() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc294d32/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
new file mode 100644
index 0000000..6f29f88
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TopNKeyOperator;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
+import org.apache.hadoop.hive.ql.plan.VectorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorTopNKeyDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+import java.util.Properties;
+
+import static org.apache.hadoop.hive.ql.plan.api.OperatorType.TOPNKEY;
+
+/**
+ * VectorTopNKeyOperator passes rows that contains top N keys only.
+ */
+public class VectorTopNKeyOperator extends Operator<TopNKeyDesc> implements VectorizationOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ private VectorTopNKeyDesc vectorDesc;
+ private VectorizationContext vContext;
+
+ // Key column info
+ private int[] keyColumnNums;
+ private TypeInfo[] keyTypeInfos;
+
+ // Extract row
+ private transient Object[] singleRow;
+ private transient VectorExtractRow vectorExtractRow;
+
+ // Serialization
+ private transient BinarySortableSerDe binarySortableSerDe;
+ private transient StructObjectInspector keyObjectInspector;
+
+ // Batch processing
+ private transient boolean firstBatch;
+ private transient PriorityQueue<Writable> priorityQueue;
+ private transient int[] temporarySelected;
+
+ public VectorTopNKeyOperator(CompilationOpContext ctx, OperatorDesc conf,
+ VectorizationContext vContext, VectorDesc vectorDesc) {
+
+ this(ctx);
+ this.conf = (TopNKeyDesc) conf;
+ this.vContext = vContext;
+ this.vectorDesc = (VectorTopNKeyDesc) vectorDesc;
+
+ VectorExpression[] keyExpressions = this.vectorDesc.getKeyExpressions();
+ final int numKeys = keyExpressions.length;
+ keyColumnNums = new int[numKeys];
+ keyTypeInfos = new TypeInfo[numKeys];
+
+ for (int i = 0; i < numKeys; i++) {
+ keyColumnNums[i] = keyExpressions[i].getOutputColumnNum();
+ keyTypeInfos[i] = keyExpressions[i].getOutputTypeInfo();
+ }
+ }
+
+ /** Kryo ctor. */
+ @VisibleForTesting
+ public VectorTopNKeyOperator() {
+ super();
+ }
+
+ public VectorTopNKeyOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ super.initializeOp(hconf);
+
+ VectorExpression.doTransientInit(vectorDesc.getKeyExpressions());
+ for (VectorExpression keyExpression : vectorDesc.getKeyExpressions()) {
+ keyExpression.init(hconf);
+ }
+
+ this.firstBatch = true;
+
+ VectorExpression[] keyExpressions = vectorDesc.getKeyExpressions();
+ final int size = keyExpressions.length;
+ ObjectInspector[] fieldObjectInspectors = new ObjectInspector[size];
+
+ for (int i = 0; i < size; i++) {
+ VectorExpression keyExpression = keyExpressions[i];
+ fieldObjectInspectors[i] = TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(
+ keyExpression.getOutputTypeInfo());
+ }
+
+ keyObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+ this.conf.getKeyColumnNames(), Arrays.asList(fieldObjectInspectors));
+
+ temporarySelected = new int [VectorizedRowBatch.DEFAULT_SIZE];
+ }
+
+ @Override
+ public void process(Object data, int tag) throws HiveException {
+ VectorizedRowBatch batch = (VectorizedRowBatch) data;
+
+ // The selected vector represents selected rows.
+ // Clone the selected vector
+ System.arraycopy(batch.selected, 0, temporarySelected, 0, batch.size);
+ int [] selectedBackup = batch.selected;
+ batch.selected = temporarySelected;
+ int sizeBackup = batch.size;
+ boolean selectedInUseBackup = batch.selectedInUse;
+
+ for (VectorExpression keyExpression : vectorDesc.getKeyExpressions()) {
+ keyExpression.evaluate(batch);
+ }
+
+ if (firstBatch) {
+ vectorExtractRow = new VectorExtractRow();
+ vectorExtractRow.init(keyObjectInspector, Ints.asList(keyColumnNums));
+
+ singleRow = new Object[vectorExtractRow.getCount()];
+ Comparator comparator = Comparator.reverseOrder();
+ priorityQueue = new PriorityQueue<Writable>(comparator);
+
+ try {
+ binarySortableSerDe = new BinarySortableSerDe();
+ Properties properties = new Properties();
+ Joiner joiner = Joiner.on(',');
+ properties.setProperty(serdeConstants.LIST_COLUMNS, joiner.join(conf.getKeyColumnNames()));
+ properties.setProperty(serdeConstants.LIST_COLUMN_TYPES, joiner.join(keyTypeInfos));
+ properties.setProperty(serdeConstants.SERIALIZATION_SORT_ORDER,
+ conf.getColumnSortOrder());
+ binarySortableSerDe.initialize(getConfiguration(), properties);
+ } catch (SerDeException e) {
+ throw new HiveException(e);
+ }
+
+ firstBatch = false;
+ }
+
+ // Clear the priority queue
+ priorityQueue.clear();
+
+ // Get top n keys
+ for (int i = 0; i < batch.size; i++) {
+
+ // Get keys
+ int j;
+ if (batch.selectedInUse) {
+ j = batch.selected[i];
+ } else {
+ j = i;
+ }
+ vectorExtractRow.extractRow(batch, j, singleRow);
+
+ Writable keysWritable;
+ try {
+ keysWritable = binarySortableSerDe.serialize(singleRow, keyObjectInspector);
+ } catch (SerDeException e) {
+ throw new HiveException(e);
+ }
+
+ // Put the copied keys into the priority queue
+ if (!priorityQueue.contains(keysWritable)) {
+ priorityQueue.offer(WritableUtils.clone(keysWritable, getConfiguration()));
+ }
+
+ // Limit the queue size
+ if (priorityQueue.size() > conf.getTopN()) {
+ priorityQueue.poll();
+ }
+ }
+
+ // Filter rows with top n keys
+ int size = 0;
+ int[] selected = new int[batch.selected.length];
+ for (int i = 0; i < batch.size; i++) {
+ int j;
+ if (batch.selectedInUse) {
+ j = batch.selected[i];
+ } else {
+ j = i;
+ }
+
+ // Get keys
+ vectorExtractRow.extractRow(batch, j, singleRow);
+
+ Writable keysWritable;
+ try {
+ keysWritable = binarySortableSerDe.serialize(singleRow, keyObjectInspector);
+ } catch (SerDeException e) {
+ throw new HiveException(e);
+ }
+
+ // Select a row in the priority queue
+ if (priorityQueue.contains(keysWritable)) {
+ selected[size++] = j;
+ }
+ }
+
+ // Apply selection to batch
+ if (batch.size != size) {
+ batch.selectedInUse = true;
+ batch.selected = selected;
+ batch.size = size;
+ }
+
+ // Forward the result
+ if (size > 0) {
+ forward(batch, null, true);
+ }
+
+ // Restore the original selected vector
+ batch.selected = selectedBackup;
+ batch.size = sizeBackup;
+ batch.selectedInUse = selectedInUseBackup;
+ }
+
+ @Override
+ public String getName() {
+ return TopNKeyOperator.getOperatorName();
+ }
+
+ @Override
+ public OperatorType getType() {
+ return TOPNKEY;
+ }
+
+ @Override
+ public VectorizationContext getInputVectorizationContext() {
+ return vContext;
+ }
+
+ @Override
+ public VectorDesc getVectorDesc() {
+ return vectorDesc;
+ }
+
+ // Because a TopNKeyOperator works like a FilterOperator with top n key condition, its properties
+ // for optimizers has same values. Following methods are same with FilterOperator;
+ // supportSkewJoinOptimization, columnNamesRowResolvedCanBeObtained,
+ // supportAutomaticSortMergeJoin, and supportUnionRemoveOptimization.
+ @Override
+ public boolean supportSkewJoinOptimization() {
+ return true;
+ }
+
+ @Override
+ public boolean columnNamesRowResolvedCanBeObtained() {
+ return true;
+ }
+
+ @Override
+ public boolean supportAutomaticSortMergeJoin() {
+ return true;
+ }
+
+ @Override
+ public boolean supportUnionRemoveOptimization() {
+ return true;
+ }
+
+ // Must send on to VectorPTFOperator...
+ @Override
+ public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws HiveException {
+ for (Operator<? extends OperatorDesc> op : childOperators) {
+ op.setNextVectorBatchGroupStatus(isLastGroupBatch);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc294d32/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TopNKeyProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TopNKeyProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TopNKeyProcessor.java
new file mode 100644
index 0000000..721a9b9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TopNKeyProcessor.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.TopNKeyOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+/**
+ * TopNKeyProcessor is a processor for TopNKeyOperator. A TopNKeyOperator will be placed between
+ * a GroupByOperator and its following ReduceSinkOperator. If there already is a TopNKeyOperator,
+ * then it will be skipped.
+ */
+public class TopNKeyProcessor implements NodeProcessor {
+ private static final Logger LOG = LoggerFactory.getLogger(TopNKeyProcessor.class);
+
+ public TopNKeyProcessor() {
+ }
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+
+ // Get ReduceSinkOperator
+ ReduceSinkOperator reduceSinkOperator = (ReduceSinkOperator) nd;
+ ReduceSinkDesc reduceSinkDesc = reduceSinkOperator.getConf();
+
+ // Get GroupByOperator
+ GroupByOperator groupByOperator = (GroupByOperator) reduceSinkOperator.getParentOperators().get(0);
+ GroupByDesc groupByDesc = groupByOperator.getConf();
+
+ // Check whether the reduce sink operator contains top n
+ if (!reduceSinkDesc.isOrdering() || reduceSinkDesc.getTopN() < 0) {
+ return null;
+ }
+
+ // Check whether the group by operator is in hash mode
+ if (groupByDesc.getMode() != GroupByDesc.Mode.HASH) {
+ return null;
+ }
+
+ // Check whether the group by operator has distinct aggregations
+ if (groupByDesc.isDistinct()) {
+ return null;
+ }
+
+ // Check whether RS keys are same as GBY keys
+ List<ExprNodeDesc> groupByKeyColumns = groupByDesc.getKeys();
+ List<ExprNodeDesc> mappedColumns = new ArrayList<>();
+ for (ExprNodeDesc columns : reduceSinkDesc.getKeyCols()) {
+ mappedColumns.add(groupByDesc.getColumnExprMap().get(columns.getExprString()));
+ }
+ if (!ExprNodeDescUtils.isSame(mappedColumns, groupByKeyColumns)) {
+ return null;
+ }
+
+ // Check whether there already is a top n key operator
+ Operator<? extends OperatorDesc> parentOperator = groupByOperator.getParentOperators().get(0);
+ if (parentOperator instanceof TopNKeyOperator) {
+ return null;
+ }
+
+ // Insert a new top n key operator between the group by operator and its parent
+ TopNKeyDesc topNKeyDesc = new TopNKeyDesc(reduceSinkDesc.getTopN(), reduceSinkDesc.getOrder(),
+ groupByKeyColumns);
+ Operator<? extends OperatorDesc> newOperator = OperatorFactory.getAndMakeChild(
+ groupByOperator.getCompilationOpContext(), (OperatorDesc) topNKeyDesc,
+ new RowSchema(groupByOperator.getSchema()), groupByOperator.getParentOperators());
+ newOperator.getChildOperators().add(groupByOperator);
+ groupByOperator.getParentOperators().add(newOperator);
+ parentOperator.removeChild(groupByOperator);
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc294d32/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index 7afbf04..1946cec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -124,6 +124,7 @@ import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PTFDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
import org.apache.hadoop.hive.ql.plan.VectorAppMasterEventDesc;
import org.apache.hadoop.hive.ql.plan.VectorDesc;
import org.apache.hadoop.hive.ql.plan.VectorFileSinkDesc;
@@ -135,6 +136,7 @@ import org.apache.hadoop.hive.ql.plan.VectorTableScanDesc;
import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc.ProcessingMode;
import org.apache.hadoop.hive.ql.plan.VectorSparkHashTableSinkDesc;
import org.apache.hadoop.hive.ql.plan.VectorSparkPartitionPruningSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorTopNKeyDesc;
import org.apache.hadoop.hive.ql.plan.VectorLimitDesc;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinInfo;
import org.apache.hadoop.hive.ql.plan.VectorSMBJoinDesc;
@@ -2555,6 +2557,10 @@ public class Vectorizer implements PhysicalPlanResolver {
desc, "Predicate", VectorExpressionDescriptor.Mode.FILTER, /* allowComplex */ true);
}
+ private boolean validateTopNKeyOperator(TopNKeyOperator op) {
+ List<ExprNodeDesc> keyColumns = op.getConf().getKeyColumns();
+ return validateExprNodeDesc(keyColumns, "Key columns");
+ }
private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce,
boolean isTezOrSpark, VectorGroupByDesc vectorGroupByDesc) {
@@ -4155,6 +4161,20 @@ public class Vectorizer implements PhysicalPlanResolver {
vContext, vectorFilterDesc);
}
+ private static Operator<? extends OperatorDesc> vectorizeTopNKeyOperator(
+ Operator<? extends OperatorDesc> topNKeyOperator, VectorizationContext vContext,
+ VectorTopNKeyDesc vectorTopNKeyDesc) throws HiveException {
+
+ TopNKeyDesc topNKeyDesc = (TopNKeyDesc) topNKeyOperator.getConf();
+
+ List<ExprNodeDesc> keyColumns = topNKeyDesc.getKeyColumns();
+ VectorExpression[] keyExpressions = vContext.getVectorExpressions(keyColumns);
+ vectorTopNKeyDesc.setKeyExpressions(keyExpressions);
+ return OperatorFactory.getVectorOperator(
+ topNKeyOperator.getCompilationOpContext(), topNKeyDesc,
+ vContext, vectorTopNKeyDesc);
+ }
+
private static Class<? extends VectorAggregateExpression> findVecAggrClass(
Class<? extends VectorAggregateExpression>[] vecAggrClasses,
String aggregateName, ColumnVector.Type inputColVectorType,
@@ -5035,6 +5055,23 @@ public class Vectorizer implements PhysicalPlanResolver {
}
}
break;
+ case TOPNKEY:
+ {
+ if (!validateTopNKeyOperator((TopNKeyOperator) op)) {
+ throw new VectorizerCannotVectorizeException();
+ }
+
+ VectorTopNKeyDesc vectorTopNKeyDesc = new VectorTopNKeyDesc();
+ vectorOp = vectorizeTopNKeyOperator(op, vContext, vectorTopNKeyDesc);
+ isNative = true;
+ if (vectorTaskColumnInfo != null) {
+ VectorExpression[] keyExpressions = vectorTopNKeyDesc.getKeyExpressions();
+ if (usesVectorUDFAdaptor(keyExpressions)) {
+ vectorTaskColumnInfo.setUsesVectorUDFAdaptor(true);
+ }
+ }
+ }
+ break;
case SELECT:
{
if (!validateSelectOperator((SelectOperator) op)) {
http://git-wip-us.apache.org/repos/asf/hive/blob/cc294d32/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index dfd7908..7ba3137 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lib.CompositeProcessor;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.lib.ForwardWalker;
@@ -77,6 +78,7 @@ import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc;
import org.apache.hadoop.hive.ql.optimizer.RemoveDynamicPruningBySize;
import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism;
import org.apache.hadoop.hive.ql.optimizer.SharedWorkOptimizer;
+import org.apache.hadoop.hive.ql.optimizer.TopNKeyProcessor;
import org.apache.hadoop.hive.ql.optimizer.correlation.ReduceSinkJoinDeDuplication;
import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits;
import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer;
@@ -141,6 +143,10 @@ public class TezCompiler extends TaskCompiler {
OptimizeTezProcContext procCtx = new OptimizeTezProcContext(conf, pCtx, inputs, outputs);
perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
+ runTopNKeyOptimization(procCtx);
+ perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Run top n key optimization");
+
+ perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
// setup dynamic partition pruning where possible
runDynamicPartitionPruning(procCtx, inputs, outputs);
perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Setup dynamic partition pruning");
@@ -1006,6 +1012,27 @@ public class TezCompiler extends TaskCompiler {
ogw.startWalking(topNodes, null);
}
+ private static void runTopNKeyOptimization(OptimizeTezProcContext procCtx)
+ throws SemanticException {
+ if (!procCtx.conf.getBoolVar(ConfVars.HIVE_OPTIMIZE_TOPNKEY)) {
+ return;
+ }
+
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(
+ new RuleRegExp("Top n key optimization", GroupByOperator.getOperatorName() + "%" +
+ ReduceSinkOperator.getOperatorName() + "%"),
+ new TopNKeyProcessor());
+
+ // The dispatcher fires the processor corresponding to the closest matching
+ // rule and passes the context along
+ Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+ List<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(procCtx.parseContext.getTopOps().values());
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+ ogw.startWalking(topNodes, null);
+ }
+
private boolean findParallelSemiJoinBranch(Operator<?> mapjoin, TableScanOperator bigTableTS,
ParseContext parseContext,
Map<ReduceSinkOperator, TableScanOperator> semijoins) {
http://git-wip-us.apache.org/repos/asf/hive/blob/cc294d32/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java
new file mode 100644
index 0000000..c62c4a9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * TopNKeyDesc.
+ *
+ */
+@Explain(displayName = "Top N Key Operator", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class TopNKeyDesc extends AbstractOperatorDesc {
+ private static final long serialVersionUID = 1L;
+
+ private int topN;
+ private String columnSortOrder;
+ private List<ExprNodeDesc> keyColumns;
+
+ public TopNKeyDesc() {
+ }
+
+ public TopNKeyDesc(
+ final int topN,
+ final String columnSortOrder,
+ final List<ExprNodeDesc> keyColumns) {
+
+ this.topN = topN;
+ this.columnSortOrder = columnSortOrder;
+ this.keyColumns = keyColumns;
+ }
+
+ @Explain(displayName = "top n", explainLevels = { Level.DEFAULT, Level.EXTENDED, Level.USER })
+ public int getTopN() {
+ return topN;
+ }
+
+ public void setTopN(int topN) {
+ this.topN = topN;
+ }
+
+ @Explain(displayName = "sort order", explainLevels = { Level.DEFAULT, Level.EXTENDED, Level.USER })
+ public String getColumnSortOrder() {
+ return columnSortOrder;
+ }
+
+ public void setColumnSortOrder(String columnSortOrder) {
+ this.columnSortOrder = columnSortOrder;
+ }
+
+ @Explain(displayName = "keys")
+ public String getKeyString() {
+ return PlanUtils.getExprListString(keyColumns);
+ }
+
+ @Explain(displayName = "keys", explainLevels = { Level.USER })
+ public String getUserLevelExplainKeyString() {
+ return PlanUtils.getExprListString(keyColumns, true);
+ }
+
+ public List<ExprNodeDesc> getKeyColumns() {
+ return keyColumns;
+ }
+
+ public void setKeyColumns(List<ExprNodeDesc> keyColumns) {
+ this.keyColumns = keyColumns;
+ }
+
+ public List<String> getKeyColumnNames() {
+ List<String> ret = new ArrayList<>();
+ for (ExprNodeDesc keyColumn : keyColumns) {
+ ret.add(keyColumn.getExprString());
+ }
+ return ret;
+ }
+
+ @Override
+ public boolean isSame(OperatorDesc other) {
+ if (getClass().getName().equals(other.getClass().getName())) {
+ TopNKeyDesc otherDesc = (TopNKeyDesc) other;
+ return getTopN() == otherDesc.getTopN() &&
+ Objects.equals(columnSortOrder, otherDesc.columnSortOrder) &&
+ ExprNodeDescUtils.isSame(keyColumns, otherDesc.keyColumns);
+ }
+ return false;
+ }
+
+ @Override
+ public Object clone() {
+ TopNKeyDesc ret = new TopNKeyDesc();
+ ret.setTopN(topN);
+ ret.setColumnSortOrder(columnSortOrder);
+ ret.setKeyColumns(getKeyColumns() == null ? null : new ArrayList<>(getKeyColumns()));
+ return ret;
+ }
+
+ public class TopNKeyDescExplainVectorization extends OperatorExplainVectorization {
+ private final TopNKeyDesc topNKeyDesc;
+ private final VectorTopNKeyDesc vectorTopNKeyDesc;
+
+ public TopNKeyDescExplainVectorization(TopNKeyDesc topNKeyDesc, VectorTopNKeyDesc vectorTopNKeyDesc) {
+ super(vectorTopNKeyDesc, true);
+ this.topNKeyDesc = topNKeyDesc;
+ this.vectorTopNKeyDesc = vectorTopNKeyDesc;
+ }
+
+ @Explain(vectorization = Explain.Vectorization.OPERATOR, displayName = "keyExpressions", explainLevels = { Level.DEFAULT, Level.EXTENDED })
+ public List<String> getKeyExpressions() {
+ return vectorExpressionsToStringList(vectorTopNKeyDesc.getKeyExpressions());
+ }
+ }
+
+ @Explain(vectorization = Explain.Vectorization.OPERATOR, displayName = "Top N Key Vectorization", explainLevels = { Level.DEFAULT, Level.EXTENDED })
+ public TopNKeyDescExplainVectorization getTopNKeyVectorization() {
+ VectorTopNKeyDesc vectorTopNKeyDesc = (VectorTopNKeyDesc) getVectorDesc();
+ if (vectorTopNKeyDesc == null) {
+ return null;
+ }
+ return new TopNKeyDescExplainVectorization(this, vectorTopNKeyDesc);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc294d32/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorTopNKeyDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorTopNKeyDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorTopNKeyDesc.java
new file mode 100644
index 0000000..9a266a0
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorTopNKeyDesc.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+
+public class VectorTopNKeyDesc extends AbstractVectorDesc {
+
+ private static final long serialVersionUID = 1L;
+
+ private VectorExpression[] keyExpressions;
+
+ public VectorTopNKeyDesc() {
+ }
+
+ public VectorExpression[] getKeyExpressions() {
+ return keyExpressions;
+ }
+
+ public void setKeyExpressions(VectorExpression[] keyExpressions) {
+ this.keyExpressions = keyExpressions;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cc294d32/ql/src/test/queries/clientpositive/topnkey.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/topnkey.q b/ql/src/test/queries/clientpositive/topnkey.q
new file mode 100644
index 0000000..e02a41d
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/topnkey.q
@@ -0,0 +1,31 @@
+--! qt:dataset:src
+set hive.mapred.mode=nonstrict;
+set hive.vectorized.execution.enabled=false;
+set hive.optimize.topnkey=true;
+
+set hive.optimize.ppd=true;
+set hive.ppd.remove.duplicatefilters=true;
+set hive.tez.dynamic.partition.pruning=true;
+set hive.optimize.metadataonly=false;
+set hive.optimize.index.filter=true;
+set hive.tez.min.bloom.filter.entries=1;
+
+set hive.tez.dynamic.partition.pruning=true;
+set hive.stats.fetch.column.stats=true;
+set hive.cbo.enable=true;
+
+EXPLAIN
+SELECT key, SUM(CAST(SUBSTR(value,5) AS INT)) FROM src GROUP BY key ORDER BY key LIMIT 5;
+
+SELECT key, SUM(CAST(SUBSTR(value,5) AS INT)) FROM src GROUP BY key ORDER BY key LIMIT 5;
+
+EXPLAIN
+SELECT key FROM src GROUP BY key ORDER BY key LIMIT 5;
+
+SELECT key FROM src GROUP BY key ORDER BY key LIMIT 5;
+
+explain vectorization detail
+SELECT src1.key, src2.value FROM src src1 JOIN src src2 ON (src1.key = src2.key) ORDER BY src1.key LIMIT 5;
+
+SELECT src1.key, src2.value FROM src src1 JOIN src src2 ON (src1.key = src2.key) ORDER BY src1.key LIMIT 5;
+
http://git-wip-us.apache.org/repos/asf/hive/blob/cc294d32/ql/src/test/queries/clientpositive/vector_topnkey.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/vector_topnkey.q b/ql/src/test/queries/clientpositive/vector_topnkey.q
new file mode 100644
index 0000000..e1b7d26
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/vector_topnkey.q
@@ -0,0 +1,30 @@
+--! qt:dataset:src
+set hive.mapred.mode=nonstrict;
+set hive.vectorized.execution.enabled=true;
+set hive.optimize.topnkey=true;
+
+set hive.optimize.ppd=true;
+set hive.ppd.remove.duplicatefilters=true;
+set hive.tez.dynamic.partition.pruning=true;
+set hive.optimize.metadataonly=false;
+set hive.optimize.index.filter=true;
+set hive.tez.min.bloom.filter.entries=1;
+
+set hive.tez.dynamic.partition.pruning=true;
+set hive.stats.fetch.column.stats=true;
+set hive.cbo.enable=true;
+
+explain vectorization detail
+SELECT key, SUM(CAST(SUBSTR(value,5) AS INT)) FROM src GROUP BY key ORDER BY key LIMIT 5;
+
+SELECT key, SUM(CAST(SUBSTR(value,5) AS INT)) FROM src GROUP BY key ORDER BY key LIMIT 5;
+
+explain vectorization detail
+SELECT key FROM src GROUP BY key ORDER BY key LIMIT 5;
+
+SELECT key FROM src GROUP BY key ORDER BY key LIMIT 5;
+
+explain vectorization detail
+SELECT src1.key, src2.value FROM src src1 JOIN src src2 ON (src1.key = src2.key) ORDER BY src1.key LIMIT 5;
+
+SELECT src1.key, src2.value FROM src src1 JOIN src src2 ON (src1.key = src2.key) ORDER BY src1.key LIMIT 5;
http://git-wip-us.apache.org/repos/asf/hive/blob/cc294d32/ql/src/test/results/clientpositive/llap/bucket_groupby.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/bucket_groupby.q.out b/ql/src/test/results/clientpositive/llap/bucket_groupby.q.out
index 4925de5..1481996 100644
--- a/ql/src/test/results/clientpositive/llap/bucket_groupby.q.out
+++ b/ql/src/test/results/clientpositive/llap/bucket_groupby.q.out
@@ -67,19 +67,24 @@ STAGE PLANS:
expressions: key (type: string)
outputColumnNames: key
Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
- Group By Operator
- aggregations: count()
+ Top N Key Operator
+ sort order: +
keys: key (type: string)
- mode: hash
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: string)
- sort order: +
- Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ top n: 10
+ Group By Operator
+ aggregations: count()
+ keys: key (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
- TopN Hash Memory Usage: 0.1
- value expressions: _col1 (type: bigint)
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
+ TopN Hash Memory Usage: 0.1
+ value expressions: _col1 (type: bigint)
Execution mode: llap
LLAP IO: no inputs
Reducer 2
@@ -194,19 +199,24 @@ STAGE PLANS:
expressions: key (type: string)
outputColumnNames: key
Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
- Group By Operator
- aggregations: count()
+ Top N Key Operator
+ sort order: +
keys: key (type: string)
- mode: hash
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: string)
- sort order: +
- Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ top n: 10
+ Group By Operator
+ aggregations: count()
+ keys: key (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
- TopN Hash Memory Usage: 0.1
- value expressions: _col1 (type: bigint)
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
+ TopN Hash Memory Usage: 0.1
+ value expressions: _col1 (type: bigint)
Execution mode: llap
LLAP IO: no inputs
Reducer 2
@@ -295,19 +305,24 @@ STAGE PLANS:
expressions: length(key) (type: int)
outputColumnNames: _col0
Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
- Group By Operator
- aggregations: count()
+ Top N Key Operator
+ sort order: +
keys: _col0 (type: int)
- mode: hash
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 250 Data size: 3000 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: int)
- sort order: +
- Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ top n: 10
+ Group By Operator
+ aggregations: count()
+ keys: _col0 (type: int)
+ mode: hash
+ outputColumnNames: _col0, _col1
Statistics: Num rows: 250 Data size: 3000 Basic stats: COMPLETE Column stats: COMPLETE
- TopN Hash Memory Usage: 0.1
- value expressions: _col1 (type: bigint)
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 250 Data size: 3000 Basic stats: COMPLETE Column stats: COMPLETE
+ TopN Hash Memory Usage: 0.1
+ value expressions: _col1 (type: bigint)
Execution mode: llap
LLAP IO: no inputs
Reducer 2
@@ -376,19 +391,24 @@ STAGE PLANS:
expressions: abs(length(key)) (type: int)
outputColumnNames: _col0
Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
- Group By Operator
- aggregations: count()
+ Top N Key Operator
+ sort order: +
keys: _col0 (type: int)
- mode: hash
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 250 Data size: 3000 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: int)
- sort order: +
- Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ top n: 10
+ Group By Operator
+ aggregations: count()
+ keys: _col0 (type: int)
+ mode: hash
+ outputColumnNames: _col0, _col1
Statistics: Num rows: 250 Data size: 3000 Basic stats: COMPLETE Column stats: COMPLETE
- TopN Hash Memory Usage: 0.1
- value expressions: _col1 (type: bigint)
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 250 Data size: 3000 Basic stats: COMPLETE Column stats: COMPLETE
+ TopN Hash Memory Usage: 0.1
+ value expressions: _col1 (type: bigint)
Execution mode: llap
LLAP IO: no inputs
Reducer 2
@@ -458,19 +478,24 @@ STAGE PLANS:
expressions: key (type: string)
outputColumnNames: key
Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
- Group By Operator
- aggregations: count()
+ Top N Key Operator
+ sort order: +
keys: key (type: string)
- mode: hash
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: string)
- sort order: +
- Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ top n: 10
+ Group By Operator
+ aggregations: count()
+ keys: key (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
- TopN Hash Memory Usage: 0.1
- value expressions: _col1 (type: bigint)
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
+ TopN Hash Memory Usage: 0.1
+ value expressions: _col1 (type: bigint)
Execution mode: llap
LLAP IO: no inputs
Reducer 2
@@ -560,19 +585,24 @@ STAGE PLANS:
expressions: value (type: string)
outputColumnNames: value
Statistics: Num rows: 500 Data size: 45500 Basic stats: COMPLETE Column stats: COMPLETE
- Group By Operator
- aggregations: count()
+ Top N Key Operator
+ sort order: +
keys: value (type: string)
- mode: hash
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 250 Data size: 24750 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: string)
- sort order: +
- Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 45500 Basic stats: COMPLETE Column stats: COMPLETE
+ top n: 10
+ Group By Operator
+ aggregations: count()
+ keys: value (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
Statistics: Num rows: 250 Data size: 24750 Basic stats: COMPLETE Column stats: COMPLETE
- TopN Hash Memory Usage: 0.1
- value expressions: _col1 (type: bigint)
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 250 Data size: 24750 Basic stats: COMPLETE Column stats: COMPLETE
+ TopN Hash Memory Usage: 0.1
+ value expressions: _col1 (type: bigint)
Execution mode: llap
LLAP IO: no inputs
Reducer 2
@@ -1160,20 +1190,25 @@ STAGE PLANS:
expressions: key (type: string)
outputColumnNames: key
Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
- Group By Operator
- aggregations: count()
- bucketGroup: true
+ Top N Key Operator
+ sort order: +
keys: key (type: string)
- mode: hash
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: string)
- sort order: +
- Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ top n: 10
+ Group By Operator
+ aggregations: count()
+ bucketGroup: true
+ keys: key (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
- TopN Hash Memory Usage: 0.1
- value expressions: _col1 (type: bigint)
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
+ TopN Hash Memory Usage: 0.1
+ value expressions: _col1 (type: bigint)
Execution mode: llap
LLAP IO: no inputs
Reducer 2
@@ -1263,19 +1298,24 @@ STAGE PLANS:
expressions: value (type: string)
outputColumnNames: value
Statistics: Num rows: 500 Data size: 45500 Basic stats: COMPLETE Column stats: COMPLETE
- Group By Operator
- aggregations: count()
+ Top N Key Operator
+ sort order: +
keys: value (type: string)
- mode: hash
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 250 Data size: 24750 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: string)
- sort order: +
- Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 45500 Basic stats: COMPLETE Column stats: COMPLETE
+ top n: 10
+ Group By Operator
+ aggregations: count()
+ keys: value (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
Statistics: Num rows: 250 Data size: 24750 Basic stats: COMPLETE Column stats: COMPLETE
- TopN Hash Memory Usage: 0.1
- value expressions: _col1 (type: bigint)
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 250 Data size: 24750 Basic stats: COMPLETE Column stats: COMPLETE
+ TopN Hash Memory Usage: 0.1
+ value expressions: _col1 (type: bigint)
Execution mode: llap
LLAP IO: no inputs
Reducer 2
@@ -1465,20 +1505,25 @@ STAGE PLANS:
expressions: key (type: string)
outputColumnNames: key
Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
- Group By Operator
- aggregations: count()
- bucketGroup: true
+ Top N Key Operator
+ sort order: +
keys: key (type: string)
- mode: hash
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: string)
- sort order: +
- Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ top n: 10
+ Group By Operator
+ aggregations: count()
+ bucketGroup: true
+ keys: key (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
- TopN Hash Memory Usage: 0.1
- value expressions: _col1 (type: bigint)
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
+ TopN Hash Memory Usage: 0.1
+ value expressions: _col1 (type: bigint)
Execution mode: llap
LLAP IO: no inputs
Reducer 2
@@ -1568,19 +1613,24 @@ STAGE PLANS:
expressions: key (type: string), value (type: string)
outputColumnNames: key, value
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
- Group By Operator
- aggregations: count()
+ Top N Key Operator
+ sort order: ++
keys: key (type: string), value (type: string)
- mode: hash
- outputColumnNames: _col0, _col1, _col2
- Statistics: Num rows: 250 Data size: 46500 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: string), _col1 (type: string)
- sort order: ++
- Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ top n: 10
+ Group By Operator
+ aggregations: count()
+ keys: key (type: string), value (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 250 Data size: 46500 Basic stats: COMPLETE Column stats: COMPLETE
- TopN Hash Memory Usage: 0.1
- value expressions: _col2 (type: bigint)
+ Reduce Output Operator
+ key expressions: _col0 (type: string), _col1 (type: string)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+ Statistics: Num rows: 250 Data size: 46500 Basic stats: COMPLETE Column stats: COMPLETE
+ TopN Hash Memory Usage: 0.1
+ value expressions: _col2 (type: bigint)
Execution mode: llap
LLAP IO: no inputs
Reducer 2
http://git-wip-us.apache.org/repos/asf/hive/blob/cc294d32/ql/src/test/results/clientpositive/llap/check_constraint.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/check_constraint.q.out b/ql/src/test/results/clientpositive/llap/check_constraint.q.out
index 085b003..b7e6da9 100644
--- a/ql/src/test/results/clientpositive/llap/check_constraint.q.out
+++ b/ql/src/test/results/clientpositive/llap/check_constraint.q.out
@@ -1674,19 +1674,24 @@ STAGE PLANS:
expressions: key (type: string), value (type: string), UDFToInteger(key) (type: int), CAST( key AS decimal(5,2)) (type: decimal(5,2))
outputColumnNames: _col0, _col1, _col2, _col3
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
- Group By Operator
- aggregations: min(_col2), max(_col3)
+ Top N Key Operator
+ sort order: ++
keys: _col0 (type: string), _col1 (type: string)
- mode: hash
- outputColumnNames: _col0, _col1, _col2, _col3
- Statistics: Num rows: 250 Data size: 73500 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: string), _col1 (type: string)
- sort order: ++
- Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ top n: 10
+ Group By Operator
+ aggregations: min(_col2), max(_col3)
+ keys: _col0 (type: string), _col1 (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2, _col3
Statistics: Num rows: 250 Data size: 73500 Basic stats: COMPLETE Column stats: COMPLETE
- TopN Hash Memory Usage: 0.1
- value expressions: _col2 (type: int), _col3 (type: decimal(5,2))
+ Reduce Output Operator
+ key expressions: _col0 (type: string), _col1 (type: string)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+ Statistics: Num rows: 250 Data size: 73500 Basic stats: COMPLETE Column stats: COMPLETE
+ TopN Hash Memory Usage: 0.1
+ value expressions: _col2 (type: int), _col3 (type: decimal(5,2))
Execution mode: vectorized, llap
LLAP IO: no inputs
Reducer 2
http://git-wip-us.apache.org/repos/asf/hive/blob/cc294d32/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/explainuser_1.q.out b/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
index 7b99494..56eb548 100644
--- a/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
+++ b/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
@@ -1264,19 +1264,21 @@ Stage-0
PartitionCols:_col0, _col1
Group By Operator [GBY_7] (rows=5 width=20)
Output:["_col0","_col1","_col2"],aggregations:["count()"],keys:_col1, _col0
- Select Operator [SEL_5] (rows=10 width=101)
- Output:["_col0","_col1"]
- Group By Operator [GBY_4] (rows=10 width=101)
- Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0, KEY._col1, KEY._col2
- <-Map 1 [SIMPLE_EDGE] llap
- SHUFFLE [RS_3]
- PartitionCols:_col0, _col1, _col2
- Group By Operator [GBY_2] (rows=10 width=101)
- Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(c_int)"],keys:key, c_int, c_float
- Select Operator [SEL_1] (rows=20 width=88)
- Output:["key","c_int","c_float"]
- TableScan [TS_0] (rows=20 width=88)
- default@cbo_t1,cbo_t1,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
+ Top N Key Operator [TNK_15] (rows=10 width=101)
+ keys:_col1, _col0,sort order:++,top n:1
+ Select Operator [SEL_5] (rows=10 width=101)
+ Output:["_col0","_col1"]
+ Group By Operator [GBY_4] (rows=10 width=101)
+ Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0, KEY._col1, KEY._col2
+ <-Map 1 [SIMPLE_EDGE] llap
+ SHUFFLE [RS_3]
+ PartitionCols:_col0, _col1, _col2
+ Group By Operator [GBY_2] (rows=10 width=101)
+ Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(c_int)"],keys:key, c_int, c_float
+ Select Operator [SEL_1] (rows=20 width=88)
+ Output:["key","c_int","c_float"]
+ TableScan [TS_0] (rows=20 width=88)
+ default@cbo_t1,cbo_t1,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
PREHOOK: query: explain select key from(select key from (select key from cbo_t1 limit 5)cbo_t2 limit 5)cbo_t3 limit 5
PREHOOK: type: QUERY