You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2018/07/19 20:59:56 UTC

[18/18] hive git commit: HIVE-17896: TopNKey: Create a standalone vectorizable TopNKey operator (Teddy Choi, reviewed by Jesus Camacho Rodriguez)

HIVE-17896: TopNKey: Create a standalone vectorizable TopNKey operator (Teddy Choi, reviewed by Jesus Camacho Rodriguez)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/851c8aba
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/851c8aba
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/851c8aba

Branch: refs/heads/master
Commit: 851c8aba86aa027cc5aa21e8b71e04a1243c35b9
Parents: e867d1c
Author: Teddy Choi <pu...@gmail.com>
Authored: Thu Jul 19 13:55:57 2018 -0700
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Thu Jul 19 13:55:57 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    2 +
 .../test/resources/testconfiguration.properties |    6 +-
 .../hadoop/hive/ql/plan/api/OperatorType.java   |    5 +-
 .../hadoop/hive/ql/exec/KeyWrapperFactory.java  |    2 +-
 .../hadoop/hive/ql/exec/OperatorFactory.java    |    4 +
 .../hadoop/hive/ql/exec/TopNKeyOperator.java    |  214 ++
 .../ql/exec/vector/VectorTopNKeyOperator.java   |  304 +++
 .../hive/ql/optimizer/TopNKeyProcessor.java     |  109 +
 .../hive/ql/optimizer/physical/Vectorizer.java  |   37 +
 .../hadoop/hive/ql/parse/TezCompiler.java       |   27 +
 .../apache/hadoop/hive/ql/plan/TopNKeyDesc.java |  139 ++
 .../hadoop/hive/ql/plan/VectorTopNKeyDesc.java  |   39 +
 ql/src/test/queries/clientpositive/topnkey.q    |   31 +
 .../queries/clientpositive/vector_topnkey.q     |   30 +
 .../clientpositive/llap/bucket_groupby.q.out    |  274 ++-
 .../clientpositive/llap/check_constraint.q.out  |   27 +-
 .../clientpositive/llap/explainuser_1.q.out     |   28 +-
 .../clientpositive/llap/explainuser_2.q.out     |  406 ++--
 .../clientpositive/llap/limit_pushdown.q.out    |  135 +-
 .../clientpositive/llap/limit_pushdown3.q.out   |   89 +-
 .../llap/llap_decimal64_reader.q.out            |   46 +-
 .../clientpositive/llap/offset_limit.q.out      |   27 +-
 .../llap/offset_limit_ppd_optimizer.q.out       |   85 +-
 .../llap/orc_struct_type_vectorization.q.out    |   53 +-
 .../parquet_complex_types_vectorization.q.out   |  159 +-
 .../llap/parquet_map_type_vectorization.q.out   |   53 +-
 .../parquet_struct_type_vectorization.q.out     |   53 +-
 .../results/clientpositive/llap/topnkey.q.out   |  318 +++
 .../llap/vector_cast_constant.q.out             |   55 +-
 .../clientpositive/llap/vector_char_2.q.out     |  110 +-
 .../vector_groupby_grouping_sets_limit.q.out    |  346 +--
 .../llap/vector_groupby_reduce.q.out            |   49 +-
 .../llap/vector_mr_diff_schema_alias.q.out      |   25 +-
 .../llap/vector_reduce_groupby_decimal.q.out    |   53 +-
 .../llap/vector_string_concat.q.out             |   47 +-
 .../clientpositive/llap/vector_topnkey.q.out    |  592 +++++
 .../llap/vectorization_limit.q.out              |   63 +-
 .../clientpositive/perf/tez/query10.q.out       |  346 +--
 .../clientpositive/perf/tez/query14.q.out       | 2198 +++++++++---------
 .../clientpositive/perf/tez/query15.q.out       |  138 +-
 .../clientpositive/perf/tez/query17.q.out       |  372 +--
 .../clientpositive/perf/tez/query25.q.out       |  366 +--
 .../clientpositive/perf/tez/query26.q.out       |  226 +-
 .../clientpositive/perf/tez/query27.q.out       |  230 +-
 .../clientpositive/perf/tez/query29.q.out       |  374 +--
 .../clientpositive/perf/tez/query35.q.out       |  346 +--
 .../clientpositive/perf/tez/query37.q.out       |  142 +-
 .../clientpositive/perf/tez/query40.q.out       |  206 +-
 .../clientpositive/perf/tez/query43.q.out       |  128 +-
 .../clientpositive/perf/tez/query45.q.out       |  272 +--
 .../clientpositive/perf/tez/query49.q.out       |  478 ++--
 .../clientpositive/perf/tez/query5.q.out        |  542 ++---
 .../clientpositive/perf/tez/query50.q.out       |  250 +-
 .../clientpositive/perf/tez/query60.q.out       |  546 ++---
 .../clientpositive/perf/tez/query66.q.out       |  452 ++--
 .../clientpositive/perf/tez/query69.q.out       |  364 +--
 .../clientpositive/perf/tez/query7.q.out        |  226 +-
 .../clientpositive/perf/tez/query76.q.out       |  356 +--
 .../clientpositive/perf/tez/query77.q.out       |  562 ++---
 .../clientpositive/perf/tez/query8.q.out        |  276 +--
 .../clientpositive/perf/tez/query80.q.out       |  756 +++---
 .../clientpositive/perf/tez/query82.q.out       |  142 +-
 .../clientpositive/perf/tez/query99.q.out       |  230 +-
 .../results/clientpositive/tez/topnkey.q.out    |  162 ++
 .../clientpositive/tez/vector_topnkey.q.out     |  162 ++
 .../test/results/clientpositive/topnkey.q.out   |  301 +++
 .../results/clientpositive/vector_topnkey.q.out |  480 ++++
 .../objectinspector/ObjectInspectorUtils.java   |   19 +
 68 files changed, 9530 insertions(+), 6160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/851c8aba/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 4ed1636..e630e88 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2233,6 +2233,8 @@ public class HiveConf extends Configuration {
         "If the skew information is correctly stored in the metadata, hive.optimize.skewjoin.compiletime\n" +
         "would change the query plan to take care of it, and hive.optimize.skewjoin will be a no-op."),
 
+    HIVE_OPTIMIZE_TOPNKEY("hive.optimize.topnkey", true, "Whether to enable top n key optimizer."),
+
     HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true,
         "Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" +
         "and follow-up operators in the query plan and merges them if they meet some preconditions. Tez only."),

http://git-wip-us.apache.org/repos/asf/hive/blob/851c8aba/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 870a9b6..d5a33bd 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -26,9 +26,11 @@ disabled.query.files=ql_rewrite_gbtoidx.q,\
 minitez.query.files.shared=delete_orig_table.q,\
   orc_merge12.q,\
   orc_vectorization_ppd.q,\
+  topnkey.q,\
   update_orig_table.q,\
   vector_join_part_col_char.q,\
-  vector_non_string_partition.q
+  vector_non_string_partition.q,\
+  vector_topnkey.q
 
 # NOTE: Add tests to minitez only if it is very
 # specific to tez and cannot be added to minillap.
@@ -209,6 +211,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\
   subquery_exists.q,\
   subquery_in.q,\
   temp_table.q,\
+  topnkey.q,\
   union2.q,\
   union3.q,\
   union4.q,\
@@ -315,6 +318,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\
   vector_reduce_groupby_duplicate_cols.q,\
   vector_string_concat.q,\
   vector_struct_in.q,\
+  vector_topnkey.q,\
   vector_udf_character_length.q,\
   vector_udf_octet_length.q,\
   vector_varchar_4.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/851c8aba/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
index a002348..f8328be 100644
--- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
+++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
@@ -37,7 +37,8 @@ public enum OperatorType implements org.apache.thrift.TEnum {
   ORCFILEMERGE(22),
   RCFILEMERGE(23),
   MERGEJOIN(24),
-  SPARKPRUNINGSINK(25);
+  SPARKPRUNINGSINK(25),
+  TOPNKEY(26);
 
   private final int value;
 
@@ -110,6 +111,8 @@ public enum OperatorType implements org.apache.thrift.TEnum {
         return MERGEJOIN;
       case 25:
         return SPARKPRUNINGSINK;
+      case 26:
+        return TOPNKEY;
       default:
         return null;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/851c8aba/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java
index 71ee25d..f1bf902 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java
@@ -168,7 +168,6 @@ public class KeyWrapperFactory {
     }
   }
 
-  transient Object[] singleEleArray = new Object[1];
   transient StringObjectInspector soi_new, soi_copy;
 
   class TextKeyWrapper extends KeyWrapper {
@@ -180,6 +179,7 @@ public class KeyWrapperFactory {
     int hashcode;
     Object key;
     boolean isCopy;
+    transient Object[] singleEleArray = new Object[1];
 
     public TextKeyWrapper(boolean isCopy) {
       this(-1, null, isCopy);

http://git-wip-us.apache.org/repos/asf/hive/blob/851c8aba/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
index 7bb6590..b61d37e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSparkPartitionPruningSinkOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorTopNKeyOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
 import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkCommonOperator;
 import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFOperator;
@@ -76,6 +77,7 @@ import org.apache.hadoop.hive.ql.plan.ScriptDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
 import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
 import org.apache.hadoop.hive.ql.plan.UDTFDesc;
 import org.apache.hadoop.hive.ql.plan.UnionDesc;
 import org.apache.hadoop.hive.ql.plan.VectorDesc;
@@ -126,6 +128,7 @@ public final class OperatorFactory {
     opvec.put(OrcFileMergeDesc.class, OrcFileMergeOperator.class);
     opvec.put(CommonMergeJoinDesc.class, CommonMergeJoinOperator.class);
     opvec.put(ListSinkDesc.class, ListSinkOperator.class);
+    opvec.put(TopNKeyDesc.class, TopNKeyOperator.class);
   }
 
   static {
@@ -143,6 +146,7 @@ public final class OperatorFactory {
     vectorOpvec.put(LimitDesc.class, VectorLimitOperator.class);
     vectorOpvec.put(PTFDesc.class, VectorPTFOperator.class);
     vectorOpvec.put(SparkHashTableSinkDesc.class, VectorSparkHashTableSinkOperator.class);
+    vectorOpvec.put(TopNKeyDesc.class, VectorTopNKeyOperator.class);
   }
 
   public static <T extends OperatorDesc> Operator<T> getVectorOperator(

http://git-wip-us.apache.org/repos/asf/hive/blob/851c8aba/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
new file mode 100644
index 0000000..3dfeeaf
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+import static org.apache.hadoop.hive.ql.plan.api.OperatorType.TOPNKEY;
+
+/**
+ * TopNKeyOperator passes rows that contains top N keys only.
+ */
+public class TopNKeyOperator extends Operator<TopNKeyDesc> implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  // Maximum number of keys to hold
+  private transient int topN;
+
+  // Priority queue that holds occurred keys
+  private transient PriorityQueue<KeyWrapper> priorityQueue;
+
+  // Fast key wrapper in input format for fast comparison
+  private transient KeyWrapper keyWrapper;
+
+  // Standard key wrapper in standard format for output
+  private transient KeyWrapper standardKeyWrapper;
+
+  // Maximum number of rows
+  private transient int rowLimit;
+
+  // Current number of rows
+  private transient int rowSize;
+
+  // Rows
+  private transient Object[] rows;
+
+  /** Kryo ctor. */
+  public TopNKeyOperator() {
+    super();
+  }
+
+  public TopNKeyOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
+  public static class KeyWrapperComparator implements Comparator<KeyWrapper> {
+    private ObjectInspector[] objectInspectors1;
+    private ObjectInspector[] objectInspectors2;
+    private boolean[] columnSortOrderIsDesc;
+
+    public KeyWrapperComparator(ObjectInspector[] objectInspectors1, ObjectInspector[]
+        objectInspectors2, boolean[] columnSortOrderIsDesc) {
+      this.objectInspectors1 = objectInspectors1;
+      this.objectInspectors2 = objectInspectors2;
+      this.columnSortOrderIsDesc = columnSortOrderIsDesc;
+    }
+
+    @Override
+    public int compare(KeyWrapper key1, KeyWrapper key2) {
+      return ObjectInspectorUtils.compare(key1.getKeyArray(), objectInspectors1,
+          key2.getKeyArray(), objectInspectors2, columnSortOrderIsDesc);
+    }
+  }
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
+
+    this.topN = conf.getTopN();
+
+    String columnSortOrder = conf.getColumnSortOrder();
+    boolean[] columnSortOrderIsDesc = new boolean[columnSortOrder.length()];
+    for (int i = 0; i < columnSortOrderIsDesc.length; i++) {
+      columnSortOrderIsDesc[i] = (columnSortOrder.charAt(i) == '-');
+    }
+
+    ObjectInspector rowInspector = inputObjInspectors[0];
+    outputObjInspector = ObjectInspectorUtils.getStandardObjectInspector(rowInspector);
+
+    // init keyFields
+    int numKeys = conf.getKeyColumns().size();
+    ExprNodeEvaluator[] keyFields = new ExprNodeEvaluator[numKeys];
+    ObjectInspector[] keyObjectInspectors = new ObjectInspector[numKeys];
+    ExprNodeEvaluator[] standardKeyFields = new ExprNodeEvaluator[numKeys];
+    ObjectInspector[] standardKeyObjectInspectors = new ObjectInspector[numKeys];
+
+    for (int i = 0; i < numKeys; i++) {
+      ExprNodeDesc key = conf.getKeyColumns().get(i);
+      keyFields[i] = ExprNodeEvaluatorFactory.get(key, hconf);
+      keyObjectInspectors[i] = keyFields[i].initialize(rowInspector);
+      standardKeyFields[i] = ExprNodeEvaluatorFactory.get(key, hconf);
+      standardKeyObjectInspectors[i] = standardKeyFields[i].initialize(outputObjInspector);
+    }
+
+    priorityQueue = new PriorityQueue<>(topN + 1, new TopNKeyOperator.KeyWrapperComparator(
+        standardKeyObjectInspectors, standardKeyObjectInspectors, columnSortOrderIsDesc));
+
+    keyWrapper = new KeyWrapperFactory(keyFields, keyObjectInspectors,
+        standardKeyObjectInspectors).getKeyWrapper();
+    standardKeyWrapper = new KeyWrapperFactory(standardKeyFields, standardKeyObjectInspectors,
+        standardKeyObjectInspectors).getKeyWrapper();
+
+    rowLimit = VectorizedRowBatch.DEFAULT_SIZE;
+    rows = new Object[rowLimit];
+    rowSize = 0;
+  }
+
+  @Override
+  public void process(Object row, int tag) throws HiveException {
+    keyWrapper.getNewKey(row, inputObjInspectors[0]);
+    keyWrapper.setHashKey();
+
+    if (!priorityQueue.contains(keyWrapper)) {
+      priorityQueue.offer(keyWrapper.copyKey());
+    }
+    if (priorityQueue.size() > topN) {
+      priorityQueue.poll();
+    }
+
+    rows[rowSize] = ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[0]);
+    rowSize++;
+
+    if (rowSize % rowLimit == 0) {
+      processRows();
+    }
+  }
+
+  private void processRows() throws HiveException {
+    for (int i = 0; i < rowSize; i++) {
+      Object row = rows[i];
+
+      standardKeyWrapper.getNewKey(row, outputObjInspector);
+      standardKeyWrapper.setHashKey();
+
+      if (priorityQueue.contains(standardKeyWrapper)) {
+        forward(row, outputObjInspector);
+      }
+    }
+    priorityQueue.clear();
+    rowSize = 0;
+  }
+
+  @Override
+  protected final void closeOp(boolean abort) throws HiveException {
+    processRows();
+    super.closeOp(abort);
+  }
+
+  @Override
+  public String getName() {
+    return getOperatorName();
+  }
+
+  static public String getOperatorName() {
+    return "TNK";
+  }
+
+  @Override
+  public OperatorType getType() {
+    return TOPNKEY;
+  }
+
+  // Because a TopNKeyOperator works like a FilterOperator with top n key condition, its properties
+  // for optimizers has same values. Following methods are same with FilterOperator;
+  // supportSkewJoinOptimization, columnNamesRowResolvedCanBeObtained,
+  // supportAutomaticSortMergeJoin, and supportUnionRemoveOptimization.
+  @Override
+  public boolean supportSkewJoinOptimization() {
+    return true;
+  }
+
+  @Override
+  public boolean columnNamesRowResolvedCanBeObtained() {
+    return true;
+  }
+
+  @Override
+  public boolean supportAutomaticSortMergeJoin() {
+    return true;
+  }
+
+  @Override
+  public boolean supportUnionRemoveOptimization() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/851c8aba/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
new file mode 100644
index 0000000..6f29f88
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TopNKeyOperator;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
+import org.apache.hadoop.hive.ql.plan.VectorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorTopNKeyDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+import java.util.Properties;
+
+import static org.apache.hadoop.hive.ql.plan.api.OperatorType.TOPNKEY;
+
+/**
+ * VectorTopNKeyOperator passes rows that contains top N keys only.
+ */
+public class VectorTopNKeyOperator extends Operator<TopNKeyDesc> implements VectorizationOperator {
+
+  private static final long serialVersionUID = 1L;
+
+  private VectorTopNKeyDesc vectorDesc;
+  private VectorizationContext vContext;
+
+  // Key column info
+  private int[] keyColumnNums;
+  private TypeInfo[] keyTypeInfos;
+
+  // Extract row
+  private transient Object[] singleRow;
+  private transient VectorExtractRow vectorExtractRow;
+
+  // Serialization
+  private transient BinarySortableSerDe binarySortableSerDe;
+  private transient StructObjectInspector keyObjectInspector;
+
+  // Batch processing
+  private transient boolean firstBatch;
+  private transient PriorityQueue<Writable> priorityQueue;
+  private transient int[] temporarySelected;
+
+  public VectorTopNKeyOperator(CompilationOpContext ctx, OperatorDesc conf,
+      VectorizationContext vContext, VectorDesc vectorDesc) {
+
+    this(ctx);
+    this.conf = (TopNKeyDesc) conf;
+    this.vContext = vContext;
+    this.vectorDesc = (VectorTopNKeyDesc) vectorDesc;
+
+    VectorExpression[] keyExpressions = this.vectorDesc.getKeyExpressions();
+    final int numKeys = keyExpressions.length;
+    keyColumnNums = new int[numKeys];
+    keyTypeInfos = new TypeInfo[numKeys];
+
+    for (int i = 0; i < numKeys; i++) {
+      keyColumnNums[i] = keyExpressions[i].getOutputColumnNum();
+      keyTypeInfos[i] = keyExpressions[i].getOutputTypeInfo();
+    }
+  }
+
+  /** Kryo ctor. */
+  @VisibleForTesting
+  public VectorTopNKeyOperator() {
+    super();
+  }
+
+  public VectorTopNKeyOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
+
+    VectorExpression.doTransientInit(vectorDesc.getKeyExpressions());
+    for (VectorExpression keyExpression : vectorDesc.getKeyExpressions()) {
+      keyExpression.init(hconf);
+    }
+
+    this.firstBatch = true;
+
+    VectorExpression[] keyExpressions = vectorDesc.getKeyExpressions();
+    final int size = keyExpressions.length;
+    ObjectInspector[] fieldObjectInspectors = new ObjectInspector[size];
+
+    for (int i = 0; i < size; i++) {
+      VectorExpression keyExpression = keyExpressions[i];
+      fieldObjectInspectors[i] = TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(
+          keyExpression.getOutputTypeInfo());
+    }
+
+    keyObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+        this.conf.getKeyColumnNames(), Arrays.asList(fieldObjectInspectors));
+
+    temporarySelected = new int [VectorizedRowBatch.DEFAULT_SIZE];
+  }
+
+  @Override
+  public void process(Object data, int tag) throws HiveException {
+    VectorizedRowBatch batch = (VectorizedRowBatch) data;
+
+    // The selected vector represents selected rows.
+    // Clone the selected vector
+    System.arraycopy(batch.selected, 0, temporarySelected, 0, batch.size);
+    int [] selectedBackup = batch.selected;
+    batch.selected = temporarySelected;
+    int sizeBackup = batch.size;
+    boolean selectedInUseBackup = batch.selectedInUse;
+
+    for (VectorExpression keyExpression : vectorDesc.getKeyExpressions()) {
+      keyExpression.evaluate(batch);
+    }
+
+    if (firstBatch) {
+      vectorExtractRow = new VectorExtractRow();
+      vectorExtractRow.init(keyObjectInspector, Ints.asList(keyColumnNums));
+
+      singleRow = new Object[vectorExtractRow.getCount()];
+      Comparator comparator = Comparator.reverseOrder();
+      priorityQueue = new PriorityQueue<Writable>(comparator);
+
+      try {
+        binarySortableSerDe = new BinarySortableSerDe();
+        Properties properties = new Properties();
+        Joiner joiner = Joiner.on(',');
+        properties.setProperty(serdeConstants.LIST_COLUMNS, joiner.join(conf.getKeyColumnNames()));
+        properties.setProperty(serdeConstants.LIST_COLUMN_TYPES, joiner.join(keyTypeInfos));
+        properties.setProperty(serdeConstants.SERIALIZATION_SORT_ORDER,
+            conf.getColumnSortOrder());
+        binarySortableSerDe.initialize(getConfiguration(), properties);
+      } catch (SerDeException e) {
+        throw new HiveException(e);
+      }
+
+      firstBatch = false;
+    }
+
+    // Clear the priority queue
+    priorityQueue.clear();
+
+    // Get top n keys
+    for (int i = 0; i < batch.size; i++) {
+
+      // Get keys
+      int j;
+      if (batch.selectedInUse) {
+        j = batch.selected[i];
+      } else {
+        j = i;
+      }
+      vectorExtractRow.extractRow(batch, j, singleRow);
+
+      Writable keysWritable;
+      try {
+        keysWritable = binarySortableSerDe.serialize(singleRow, keyObjectInspector);
+      } catch (SerDeException e) {
+        throw new HiveException(e);
+      }
+
+      // Put the copied keys into the priority queue
+      if (!priorityQueue.contains(keysWritable)) {
+        priorityQueue.offer(WritableUtils.clone(keysWritable, getConfiguration()));
+      }
+
+      // Limit the queue size
+      if (priorityQueue.size() > conf.getTopN()) {
+        priorityQueue.poll();
+      }
+    }
+
+    // Filter rows with top n keys
+    int size = 0;
+    int[] selected = new int[batch.selected.length];
+    for (int i = 0; i < batch.size; i++) {
+      int j;
+      if (batch.selectedInUse) {
+        j = batch.selected[i];
+      } else {
+        j = i;
+      }
+
+      // Get keys
+      vectorExtractRow.extractRow(batch, j, singleRow);
+
+      Writable keysWritable;
+      try {
+        keysWritable = binarySortableSerDe.serialize(singleRow, keyObjectInspector);
+      } catch (SerDeException e) {
+        throw new HiveException(e);
+      }
+
+      // Select a row in the priority queue
+      if (priorityQueue.contains(keysWritable)) {
+        selected[size++] = j;
+      }
+    }
+
+    // Apply selection to batch
+    if (batch.size != size) {
+      batch.selectedInUse = true;
+      batch.selected = selected;
+      batch.size = size;
+    }
+
+    // Forward the result
+    if (size > 0) {
+      forward(batch, null, true);
+    }
+
+    // Restore the original selected vector
+    batch.selected = selectedBackup;
+    batch.size = sizeBackup;
+    batch.selectedInUse = selectedInUseBackup;
+  }
+
+  @Override
+  public String getName() {
+    return TopNKeyOperator.getOperatorName();
+  }
+
+  @Override
+  public OperatorType getType() {
+    return TOPNKEY;
+  }
+
+  @Override
+  public VectorizationContext getInputVectorizationContext() {
+    return vContext;
+  }
+
+  @Override
+  public VectorDesc getVectorDesc() {
+    return vectorDesc;
+  }
+
+  // Because a TopNKeyOperator works like a FilterOperator with top n key condition, its properties
+  // for optimizers has same values. Following methods are same with FilterOperator;
+  // supportSkewJoinOptimization, columnNamesRowResolvedCanBeObtained,
+  // supportAutomaticSortMergeJoin, and supportUnionRemoveOptimization.
+  @Override
+  public boolean supportSkewJoinOptimization() {
+    return true;
+  }
+
+  @Override
+  public boolean columnNamesRowResolvedCanBeObtained() {
+    return true;
+  }
+
+  @Override
+  public boolean supportAutomaticSortMergeJoin() {
+    return true;
+  }
+
+  @Override
+  public boolean supportUnionRemoveOptimization() {
+    return true;
+  }
+
+  // Must send on to VectorPTFOperator...
+  @Override
+  public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws HiveException {
+    for (Operator<? extends OperatorDesc> op : childOperators) {
+      op.setNextVectorBatchGroupStatus(isLastGroupBatch);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/851c8aba/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TopNKeyProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TopNKeyProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TopNKeyProcessor.java
new file mode 100644
index 0000000..721a9b9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TopNKeyProcessor.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.TopNKeyOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+/**
+ * TopNKeyProcessor is a processor for TopNKeyOperator. A TopNKeyOperator will be placed between
+ * a GroupByOperator and its following ReduceSinkOperator. If there already is a TopNKeyOperator,
+ * then it will be skipped.
+ */
+public class TopNKeyProcessor implements NodeProcessor {
+  private static final Logger LOG = LoggerFactory.getLogger(TopNKeyProcessor.class);
+
+  public TopNKeyProcessor() {
+  }
+
+  @Override
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+      Object... nodeOutputs) throws SemanticException {
+
+    // Get ReduceSinkOperator
+    ReduceSinkOperator reduceSinkOperator = (ReduceSinkOperator) nd;
+    ReduceSinkDesc reduceSinkDesc = reduceSinkOperator.getConf();
+
+    // Get GroupByOperator
+    GroupByOperator groupByOperator = (GroupByOperator) reduceSinkOperator.getParentOperators().get(0);
+    GroupByDesc groupByDesc = groupByOperator.getConf();
+
+    // Check whether the reduce sink operator contains top n
+    if (!reduceSinkDesc.isOrdering() || reduceSinkDesc.getTopN() < 0) {
+      return null;
+    }
+
+    // Check whether the group by operator is in hash mode
+    if (groupByDesc.getMode() != GroupByDesc.Mode.HASH) {
+      return null;
+    }
+
+    // Check whether the group by operator has distinct aggregations
+    if (groupByDesc.isDistinct()) {
+      return null;
+    }
+
+    // Check whether RS keys are same as GBY keys
+    List<ExprNodeDesc> groupByKeyColumns = groupByDesc.getKeys();
+    List<ExprNodeDesc> mappedColumns = new ArrayList<>();
+    for (ExprNodeDesc columns : reduceSinkDesc.getKeyCols()) {
+      mappedColumns.add(groupByDesc.getColumnExprMap().get(columns.getExprString()));
+    }
+    if (!ExprNodeDescUtils.isSame(mappedColumns, groupByKeyColumns)) {
+      return null;
+    }
+
+    // Check whether there already is a top n key operator
+    Operator<? extends OperatorDesc> parentOperator = groupByOperator.getParentOperators().get(0);
+    if (parentOperator instanceof TopNKeyOperator) {
+      return null;
+    }
+
+    // Insert a new top n key operator between the group by operator and its parent
+    TopNKeyDesc topNKeyDesc = new TopNKeyDesc(reduceSinkDesc.getTopN(), reduceSinkDesc.getOrder(),
+        groupByKeyColumns);
+    Operator<? extends OperatorDesc> newOperator = OperatorFactory.getAndMakeChild(
+        groupByOperator.getCompilationOpContext(), (OperatorDesc) topNKeyDesc,
+        new RowSchema(groupByOperator.getSchema()), groupByOperator.getParentOperators());
+    newOperator.getChildOperators().add(groupByOperator);
+    groupByOperator.getParentOperators().add(newOperator);
+    parentOperator.removeChild(groupByOperator);
+
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/851c8aba/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index 7ec80e6..40bd075 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -124,6 +124,7 @@ import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PTFDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
 import org.apache.hadoop.hive.ql.plan.VectorAppMasterEventDesc;
 import org.apache.hadoop.hive.ql.plan.VectorDesc;
 import org.apache.hadoop.hive.ql.plan.VectorFileSinkDesc;
@@ -135,6 +136,7 @@ import org.apache.hadoop.hive.ql.plan.VectorTableScanDesc;
 import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc.ProcessingMode;
 import org.apache.hadoop.hive.ql.plan.VectorSparkHashTableSinkDesc;
 import org.apache.hadoop.hive.ql.plan.VectorSparkPartitionPruningSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorTopNKeyDesc;
 import org.apache.hadoop.hive.ql.plan.VectorLimitDesc;
 import org.apache.hadoop.hive.ql.plan.VectorMapJoinInfo;
 import org.apache.hadoop.hive.ql.plan.VectorSMBJoinDesc;
@@ -2555,6 +2557,10 @@ public class Vectorizer implements PhysicalPlanResolver {
         desc, "Predicate", VectorExpressionDescriptor.Mode.FILTER, /* allowComplex */ true);
   }
 
+  private boolean validateTopNKeyOperator(TopNKeyOperator op) {
+    List<ExprNodeDesc> keyColumns = op.getConf().getKeyColumns();
+    return validateExprNodeDesc(keyColumns, "Key columns");
+  }
 
   private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce,
       boolean isTezOrSpark, VectorGroupByDesc vectorGroupByDesc) {
@@ -4155,6 +4161,20 @@ public class Vectorizer implements PhysicalPlanResolver {
         vContext, vectorFilterDesc);
   }
 
+  private static Operator<? extends OperatorDesc> vectorizeTopNKeyOperator(
+      Operator<? extends OperatorDesc> topNKeyOperator, VectorizationContext vContext,
+      VectorTopNKeyDesc vectorTopNKeyDesc) throws HiveException {
+
+    TopNKeyDesc topNKeyDesc = (TopNKeyDesc) topNKeyOperator.getConf();
+
+    List<ExprNodeDesc> keyColumns = topNKeyDesc.getKeyColumns();
+    VectorExpression[] keyExpressions = vContext.getVectorExpressions(keyColumns);
+    vectorTopNKeyDesc.setKeyExpressions(keyExpressions);
+    return OperatorFactory.getVectorOperator(
+        topNKeyOperator.getCompilationOpContext(), topNKeyDesc,
+        vContext, vectorTopNKeyDesc);
+  }
+
   private static Class<? extends VectorAggregateExpression> findVecAggrClass(
       Class<? extends VectorAggregateExpression>[] vecAggrClasses,
       String aggregateName, ColumnVector.Type inputColVectorType,
@@ -5051,6 +5071,23 @@ public class Vectorizer implements PhysicalPlanResolver {
             }
           }
           break;
+        case TOPNKEY:
+          {
+            if (!validateTopNKeyOperator((TopNKeyOperator) op)) {
+              throw new VectorizerCannotVectorizeException();
+            }
+
+            VectorTopNKeyDesc vectorTopNKeyDesc = new VectorTopNKeyDesc();
+            vectorOp = vectorizeTopNKeyOperator(op, vContext, vectorTopNKeyDesc);
+            isNative = true;
+            if (vectorTaskColumnInfo != null) {
+              VectorExpression[] keyExpressions = vectorTopNKeyDesc.getKeyExpressions();
+              if (usesVectorUDFAdaptor(keyExpressions)) {
+                vectorTaskColumnInfo.setUsesVectorUDFAdaptor(true);
+              }
+            }
+          }
+          break;
         case SELECT:
           {
             if (!validateSelectOperator((SelectOperator) op)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/851c8aba/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index 1b433c7..1661aec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.lib.CompositeProcessor;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
 import org.apache.hadoop.hive.ql.lib.ForwardWalker;
@@ -77,6 +78,7 @@ import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc;
 import org.apache.hadoop.hive.ql.optimizer.RemoveDynamicPruningBySize;
 import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism;
 import org.apache.hadoop.hive.ql.optimizer.SharedWorkOptimizer;
+import org.apache.hadoop.hive.ql.optimizer.TopNKeyProcessor;
 import org.apache.hadoop.hive.ql.optimizer.correlation.ReduceSinkJoinDeDuplication;
 import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits;
 import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer;
@@ -144,6 +146,10 @@ public class TezCompiler extends TaskCompiler {
     OptimizeTezProcContext procCtx = new OptimizeTezProcContext(conf, pCtx, inputs, outputs);
 
     perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
+    runTopNKeyOptimization(procCtx);
+    perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Run top n key optimization");
+
+    perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
     // setup dynamic partition pruning where possible
     runDynamicPartitionPruning(procCtx, inputs, outputs);
     perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Setup dynamic partition pruning");
@@ -1231,6 +1237,27 @@ public class TezCompiler extends TaskCompiler {
     }
   }
 
+  private static void runTopNKeyOptimization(OptimizeTezProcContext procCtx)
+      throws SemanticException {
+    if (!procCtx.conf.getBoolVar(ConfVars.HIVE_OPTIMIZE_TOPNKEY)) {
+      return;
+    }
+
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(
+        new RuleRegExp("Top n key optimization", GroupByOperator.getOperatorName() + "%" +
+            ReduceSinkOperator.getOperatorName() + "%"),
+        new TopNKeyProcessor());
+
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+    List<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(procCtx.parseContext.getTopOps().values());
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+    ogw.startWalking(topNodes, null);
+  }
+
   private boolean findParallelSemiJoinBranch(Operator<?> mapjoin, TableScanOperator bigTableTS,
                                              ParseContext parseContext,
                                              Map<ReduceSinkOperator, TableScanOperator> semijoins) {

http://git-wip-us.apache.org/repos/asf/hive/blob/851c8aba/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java
new file mode 100644
index 0000000..c62c4a9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * TopNKeyDesc.
+ *
+ */
+@Explain(displayName = "Top N Key Operator", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class TopNKeyDesc extends AbstractOperatorDesc {
+  private static final long serialVersionUID = 1L;
+
+  private int topN;
+  private String columnSortOrder;
+  private List<ExprNodeDesc> keyColumns;
+
+  public TopNKeyDesc() {
+  }
+
+  public TopNKeyDesc(
+      final int topN,
+      final String columnSortOrder,
+      final List<ExprNodeDesc> keyColumns) {
+
+    this.topN = topN;
+    this.columnSortOrder = columnSortOrder;
+    this.keyColumns = keyColumns;
+  }
+
+  @Explain(displayName = "top n", explainLevels = { Level.DEFAULT, Level.EXTENDED, Level.USER })
+  public int getTopN() {
+    return topN;
+  }
+
+  public void setTopN(int topN) {
+    this.topN = topN;
+  }
+
+  @Explain(displayName = "sort order", explainLevels = { Level.DEFAULT, Level.EXTENDED, Level.USER })
+  public String getColumnSortOrder() {
+    return columnSortOrder;
+  }
+
+  public void setColumnSortOrder(String columnSortOrder) {
+    this.columnSortOrder = columnSortOrder;
+  }
+
+  @Explain(displayName = "keys")
+  public String getKeyString() {
+    return PlanUtils.getExprListString(keyColumns);
+  }
+
+  @Explain(displayName = "keys", explainLevels = { Level.USER })
+  public String getUserLevelExplainKeyString() {
+    return PlanUtils.getExprListString(keyColumns, true);
+  }
+
+  public List<ExprNodeDesc> getKeyColumns() {
+    return keyColumns;
+  }
+
+  public void setKeyColumns(List<ExprNodeDesc> keyColumns) {
+    this.keyColumns = keyColumns;
+  }
+
+  public List<String> getKeyColumnNames() {
+    List<String> ret = new ArrayList<>();
+    for (ExprNodeDesc keyColumn : keyColumns) {
+      ret.add(keyColumn.getExprString());
+    }
+    return ret;
+  }
+
+  @Override
+  public boolean isSame(OperatorDesc other) {
+    if (getClass().getName().equals(other.getClass().getName())) {
+      TopNKeyDesc otherDesc = (TopNKeyDesc) other;
+      return getTopN() == otherDesc.getTopN() &&
+          Objects.equals(columnSortOrder, otherDesc.columnSortOrder) &&
+          ExprNodeDescUtils.isSame(keyColumns, otherDesc.keyColumns);
+    }
+    return false;
+  }
+
+  @Override
+  public Object clone() {
+    TopNKeyDesc ret = new TopNKeyDesc();
+    ret.setTopN(topN);
+    ret.setColumnSortOrder(columnSortOrder);
+    ret.setKeyColumns(getKeyColumns() == null ? null : new ArrayList<>(getKeyColumns()));
+    return ret;
+  }
+
+  public class TopNKeyDescExplainVectorization extends OperatorExplainVectorization {
+    private final TopNKeyDesc topNKeyDesc;
+    private final VectorTopNKeyDesc vectorTopNKeyDesc;
+
+    public TopNKeyDescExplainVectorization(TopNKeyDesc topNKeyDesc, VectorTopNKeyDesc vectorTopNKeyDesc) {
+      super(vectorTopNKeyDesc, true);
+      this.topNKeyDesc = topNKeyDesc;
+      this.vectorTopNKeyDesc = vectorTopNKeyDesc;
+    }
+
+    @Explain(vectorization = Explain.Vectorization.OPERATOR, displayName = "keyExpressions", explainLevels = { Level.DEFAULT, Level.EXTENDED })
+    public List<String> getKeyExpressions() {
+      return vectorExpressionsToStringList(vectorTopNKeyDesc.getKeyExpressions());
+    }
+  }
+
+  @Explain(vectorization = Explain.Vectorization.OPERATOR, displayName = "Top N Key Vectorization", explainLevels = { Level.DEFAULT, Level.EXTENDED })
+  public TopNKeyDescExplainVectorization getTopNKeyVectorization() {
+    VectorTopNKeyDesc vectorTopNKeyDesc = (VectorTopNKeyDesc) getVectorDesc();
+    if (vectorTopNKeyDesc == null) {
+      return null;
+    }
+    return new TopNKeyDescExplainVectorization(this, vectorTopNKeyDesc);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/851c8aba/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorTopNKeyDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorTopNKeyDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorTopNKeyDesc.java
new file mode 100644
index 0000000..9a266a0
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorTopNKeyDesc.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+
+public class VectorTopNKeyDesc extends AbstractVectorDesc {
+
+  private static final long serialVersionUID = 1L;
+
+  private VectorExpression[] keyExpressions;
+
+  public VectorTopNKeyDesc() {
+  }
+
+  public VectorExpression[] getKeyExpressions() {
+    return keyExpressions;
+  }
+
+  public void setKeyExpressions(VectorExpression[] keyExpressions) {
+    this.keyExpressions = keyExpressions;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/851c8aba/ql/src/test/queries/clientpositive/topnkey.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/topnkey.q b/ql/src/test/queries/clientpositive/topnkey.q
new file mode 100644
index 0000000..e02a41d
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/topnkey.q
@@ -0,0 +1,31 @@
+--! qt:dataset:src
+set hive.mapred.mode=nonstrict;
+set hive.vectorized.execution.enabled=false;
+set hive.optimize.topnkey=true;
+
+set hive.optimize.ppd=true;
+set hive.ppd.remove.duplicatefilters=true;
+set hive.tez.dynamic.partition.pruning=true;
+set hive.optimize.metadataonly=false;
+set hive.optimize.index.filter=true;
+set hive.tez.min.bloom.filter.entries=1;
+
+set hive.tez.dynamic.partition.pruning=true;
+set hive.stats.fetch.column.stats=true;
+set hive.cbo.enable=true;
+
+EXPLAIN
+SELECT key, SUM(CAST(SUBSTR(value,5) AS INT)) FROM src GROUP BY key ORDER BY key LIMIT 5;
+
+SELECT key, SUM(CAST(SUBSTR(value,5) AS INT)) FROM src GROUP BY key ORDER BY key LIMIT 5;
+
+EXPLAIN
+SELECT key FROM src GROUP BY key ORDER BY key LIMIT 5;
+
+SELECT key FROM src GROUP BY key ORDER BY key LIMIT 5;
+
+explain vectorization detail
+SELECT src1.key, src2.value FROM src src1 JOIN src src2 ON (src1.key = src2.key) ORDER BY src1.key LIMIT 5;
+
+SELECT src1.key, src2.value FROM src src1 JOIN src src2 ON (src1.key = src2.key) ORDER BY src1.key LIMIT 5;
+

http://git-wip-us.apache.org/repos/asf/hive/blob/851c8aba/ql/src/test/queries/clientpositive/vector_topnkey.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/vector_topnkey.q b/ql/src/test/queries/clientpositive/vector_topnkey.q
new file mode 100644
index 0000000..e1b7d26
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/vector_topnkey.q
@@ -0,0 +1,30 @@
+--! qt:dataset:src
+set hive.mapred.mode=nonstrict;
+set hive.vectorized.execution.enabled=true;
+set hive.optimize.topnkey=true;
+
+set hive.optimize.ppd=true;
+set hive.ppd.remove.duplicatefilters=true;
+set hive.tez.dynamic.partition.pruning=true;
+set hive.optimize.metadataonly=false;
+set hive.optimize.index.filter=true;
+set hive.tez.min.bloom.filter.entries=1;
+
+set hive.tez.dynamic.partition.pruning=true;
+set hive.stats.fetch.column.stats=true;
+set hive.cbo.enable=true;
+
+explain vectorization detail
+SELECT key, SUM(CAST(SUBSTR(value,5) AS INT)) FROM src GROUP BY key ORDER BY key LIMIT 5;
+
+SELECT key, SUM(CAST(SUBSTR(value,5) AS INT)) FROM src GROUP BY key ORDER BY key LIMIT 5;
+
+explain vectorization detail
+SELECT key FROM src GROUP BY key ORDER BY key LIMIT 5;
+
+SELECT key FROM src GROUP BY key ORDER BY key LIMIT 5;
+
+explain vectorization detail
+SELECT src1.key, src2.value FROM src src1 JOIN src src2 ON (src1.key = src2.key) ORDER BY src1.key LIMIT 5;
+
+SELECT src1.key, src2.value FROM src src1 JOIN src src2 ON (src1.key = src2.key) ORDER BY src1.key LIMIT 5;

http://git-wip-us.apache.org/repos/asf/hive/blob/851c8aba/ql/src/test/results/clientpositive/llap/bucket_groupby.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/bucket_groupby.q.out b/ql/src/test/results/clientpositive/llap/bucket_groupby.q.out
index bee7889..726d46b 100644
--- a/ql/src/test/results/clientpositive/llap/bucket_groupby.q.out
+++ b/ql/src/test/results/clientpositive/llap/bucket_groupby.q.out
@@ -68,19 +68,24 @@ STAGE PLANS:
                     expressions: key (type: string)
                     outputColumnNames: key
                     Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
-                    Group By Operator
-                      aggregations: count()
+                    Top N Key Operator
+                      sort order: +
                       keys: key (type: string)
-                      mode: hash
-                      outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: string)
+                      Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                      top n: 10
+                      Group By Operator
+                        aggregations: count()
+                        keys: key (type: string)
+                        mode: hash
+                        outputColumnNames: _col0, _col1
                         Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
-                        TopN Hash Memory Usage: 0.1
-                        value expressions: _col1 (type: bigint)
+                        Reduce Output Operator
+                          key expressions: _col0 (type: string)
+                          sort order: +
+                          Map-reduce partition columns: _col0 (type: string)
+                          Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
+                          TopN Hash Memory Usage: 0.1
+                          value expressions: _col1 (type: bigint)
             Execution mode: llap
             LLAP IO: no inputs
         Reducer 2 
@@ -196,19 +201,24 @@ STAGE PLANS:
                     expressions: key (type: string)
                     outputColumnNames: key
                     Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
-                    Group By Operator
-                      aggregations: count()
+                    Top N Key Operator
+                      sort order: +
                       keys: key (type: string)
-                      mode: hash
-                      outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: string)
+                      Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                      top n: 10
+                      Group By Operator
+                        aggregations: count()
+                        keys: key (type: string)
+                        mode: hash
+                        outputColumnNames: _col0, _col1
                         Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
-                        TopN Hash Memory Usage: 0.1
-                        value expressions: _col1 (type: bigint)
+                        Reduce Output Operator
+                          key expressions: _col0 (type: string)
+                          sort order: +
+                          Map-reduce partition columns: _col0 (type: string)
+                          Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
+                          TopN Hash Memory Usage: 0.1
+                          value expressions: _col1 (type: bigint)
             Execution mode: llap
             LLAP IO: no inputs
         Reducer 2 
@@ -298,19 +308,24 @@ STAGE PLANS:
                     expressions: length(key) (type: int)
                     outputColumnNames: _col0
                     Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
-                    Group By Operator
-                      aggregations: count()
+                    Top N Key Operator
+                      sort order: +
                       keys: _col0 (type: int)
-                      mode: hash
-                      outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 250 Data size: 3000 Basic stats: COMPLETE Column stats: COMPLETE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
+                      Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                      top n: 10
+                      Group By Operator
+                        aggregations: count()
+                        keys: _col0 (type: int)
+                        mode: hash
+                        outputColumnNames: _col0, _col1
                         Statistics: Num rows: 250 Data size: 3000 Basic stats: COMPLETE Column stats: COMPLETE
-                        TopN Hash Memory Usage: 0.1
-                        value expressions: _col1 (type: bigint)
+                        Reduce Output Operator
+                          key expressions: _col0 (type: int)
+                          sort order: +
+                          Map-reduce partition columns: _col0 (type: int)
+                          Statistics: Num rows: 250 Data size: 3000 Basic stats: COMPLETE Column stats: COMPLETE
+                          TopN Hash Memory Usage: 0.1
+                          value expressions: _col1 (type: bigint)
             Execution mode: llap
             LLAP IO: no inputs
         Reducer 2 
@@ -380,19 +395,24 @@ STAGE PLANS:
                     expressions: abs(length(key)) (type: int)
                     outputColumnNames: _col0
                     Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
-                    Group By Operator
-                      aggregations: count()
+                    Top N Key Operator
+                      sort order: +
                       keys: _col0 (type: int)
-                      mode: hash
-                      outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 250 Data size: 3000 Basic stats: COMPLETE Column stats: COMPLETE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
+                      Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                      top n: 10
+                      Group By Operator
+                        aggregations: count()
+                        keys: _col0 (type: int)
+                        mode: hash
+                        outputColumnNames: _col0, _col1
                         Statistics: Num rows: 250 Data size: 3000 Basic stats: COMPLETE Column stats: COMPLETE
-                        TopN Hash Memory Usage: 0.1
-                        value expressions: _col1 (type: bigint)
+                        Reduce Output Operator
+                          key expressions: _col0 (type: int)
+                          sort order: +
+                          Map-reduce partition columns: _col0 (type: int)
+                          Statistics: Num rows: 250 Data size: 3000 Basic stats: COMPLETE Column stats: COMPLETE
+                          TopN Hash Memory Usage: 0.1
+                          value expressions: _col1 (type: bigint)
             Execution mode: llap
             LLAP IO: no inputs
         Reducer 2 
@@ -463,19 +483,24 @@ STAGE PLANS:
                     expressions: key (type: string)
                     outputColumnNames: key
                     Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
-                    Group By Operator
-                      aggregations: count()
+                    Top N Key Operator
+                      sort order: +
                       keys: key (type: string)
-                      mode: hash
-                      outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: string)
+                      Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                      top n: 10
+                      Group By Operator
+                        aggregations: count()
+                        keys: key (type: string)
+                        mode: hash
+                        outputColumnNames: _col0, _col1
                         Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
-                        TopN Hash Memory Usage: 0.1
-                        value expressions: _col1 (type: bigint)
+                        Reduce Output Operator
+                          key expressions: _col0 (type: string)
+                          sort order: +
+                          Map-reduce partition columns: _col0 (type: string)
+                          Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
+                          TopN Hash Memory Usage: 0.1
+                          value expressions: _col1 (type: bigint)
             Execution mode: llap
             LLAP IO: no inputs
         Reducer 2 
@@ -566,19 +591,24 @@ STAGE PLANS:
                     expressions: value (type: string)
                     outputColumnNames: value
                     Statistics: Num rows: 500 Data size: 45500 Basic stats: COMPLETE Column stats: COMPLETE
-                    Group By Operator
-                      aggregations: count()
+                    Top N Key Operator
+                      sort order: +
                       keys: value (type: string)
-                      mode: hash
-                      outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 250 Data size: 24750 Basic stats: COMPLETE Column stats: COMPLETE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: string)
+                      Statistics: Num rows: 500 Data size: 45500 Basic stats: COMPLETE Column stats: COMPLETE
+                      top n: 10
+                      Group By Operator
+                        aggregations: count()
+                        keys: value (type: string)
+                        mode: hash
+                        outputColumnNames: _col0, _col1
                         Statistics: Num rows: 250 Data size: 24750 Basic stats: COMPLETE Column stats: COMPLETE
-                        TopN Hash Memory Usage: 0.1
-                        value expressions: _col1 (type: bigint)
+                        Reduce Output Operator
+                          key expressions: _col0 (type: string)
+                          sort order: +
+                          Map-reduce partition columns: _col0 (type: string)
+                          Statistics: Num rows: 250 Data size: 24750 Basic stats: COMPLETE Column stats: COMPLETE
+                          TopN Hash Memory Usage: 0.1
+                          value expressions: _col1 (type: bigint)
             Execution mode: llap
             LLAP IO: no inputs
         Reducer 2 
@@ -1167,20 +1197,25 @@ STAGE PLANS:
                     expressions: key (type: string)
                     outputColumnNames: key
                     Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
-                    Group By Operator
-                      aggregations: count()
-                      bucketGroup: true
+                    Top N Key Operator
+                      sort order: +
                       keys: key (type: string)
-                      mode: hash
-                      outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: string)
+                      Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                      top n: 10
+                      Group By Operator
+                        aggregations: count()
+                        bucketGroup: true
+                        keys: key (type: string)
+                        mode: hash
+                        outputColumnNames: _col0, _col1
                         Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
-                        TopN Hash Memory Usage: 0.1
-                        value expressions: _col1 (type: bigint)
+                        Reduce Output Operator
+                          key expressions: _col0 (type: string)
+                          sort order: +
+                          Map-reduce partition columns: _col0 (type: string)
+                          Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
+                          TopN Hash Memory Usage: 0.1
+                          value expressions: _col1 (type: bigint)
             Execution mode: llap
             LLAP IO: no inputs
         Reducer 2 
@@ -1271,19 +1306,24 @@ STAGE PLANS:
                     expressions: value (type: string)
                     outputColumnNames: value
                     Statistics: Num rows: 500 Data size: 45500 Basic stats: COMPLETE Column stats: COMPLETE
-                    Group By Operator
-                      aggregations: count()
+                    Top N Key Operator
+                      sort order: +
                       keys: value (type: string)
-                      mode: hash
-                      outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 250 Data size: 24750 Basic stats: COMPLETE Column stats: COMPLETE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: string)
+                      Statistics: Num rows: 500 Data size: 45500 Basic stats: COMPLETE Column stats: COMPLETE
+                      top n: 10
+                      Group By Operator
+                        aggregations: count()
+                        keys: value (type: string)
+                        mode: hash
+                        outputColumnNames: _col0, _col1
                         Statistics: Num rows: 250 Data size: 24750 Basic stats: COMPLETE Column stats: COMPLETE
-                        TopN Hash Memory Usage: 0.1
-                        value expressions: _col1 (type: bigint)
+                        Reduce Output Operator
+                          key expressions: _col0 (type: string)
+                          sort order: +
+                          Map-reduce partition columns: _col0 (type: string)
+                          Statistics: Num rows: 250 Data size: 24750 Basic stats: COMPLETE Column stats: COMPLETE
+                          TopN Hash Memory Usage: 0.1
+                          value expressions: _col1 (type: bigint)
             Execution mode: llap
             LLAP IO: no inputs
         Reducer 2 
@@ -1475,20 +1515,25 @@ STAGE PLANS:
                     expressions: key (type: string)
                     outputColumnNames: key
                     Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
-                    Group By Operator
-                      aggregations: count()
-                      bucketGroup: true
+                    Top N Key Operator
+                      sort order: +
                       keys: key (type: string)
-                      mode: hash
-                      outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: string)
+                      Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                      top n: 10
+                      Group By Operator
+                        aggregations: count()
+                        bucketGroup: true
+                        keys: key (type: string)
+                        mode: hash
+                        outputColumnNames: _col0, _col1
                         Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
-                        TopN Hash Memory Usage: 0.1
-                        value expressions: _col1 (type: bigint)
+                        Reduce Output Operator
+                          key expressions: _col0 (type: string)
+                          sort order: +
+                          Map-reduce partition columns: _col0 (type: string)
+                          Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE
+                          TopN Hash Memory Usage: 0.1
+                          value expressions: _col1 (type: bigint)
             Execution mode: llap
             LLAP IO: no inputs
         Reducer 2 
@@ -1579,19 +1624,24 @@ STAGE PLANS:
                     expressions: key (type: string), value (type: string)
                     outputColumnNames: key, value
                     Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
-                    Group By Operator
-                      aggregations: count()
+                    Top N Key Operator
+                      sort order: ++
                       keys: key (type: string), value (type: string)
-                      mode: hash
-                      outputColumnNames: _col0, _col1, _col2
-                      Statistics: Num rows: 250 Data size: 46500 Basic stats: COMPLETE Column stats: COMPLETE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string), _col1 (type: string)
-                        sort order: ++
-                        Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      top n: 10
+                      Group By Operator
+                        aggregations: count()
+                        keys: key (type: string), value (type: string)
+                        mode: hash
+                        outputColumnNames: _col0, _col1, _col2
                         Statistics: Num rows: 250 Data size: 46500 Basic stats: COMPLETE Column stats: COMPLETE
-                        TopN Hash Memory Usage: 0.1
-                        value expressions: _col2 (type: bigint)
+                        Reduce Output Operator
+                          key expressions: _col0 (type: string), _col1 (type: string)
+                          sort order: ++
+                          Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+                          Statistics: Num rows: 250 Data size: 46500 Basic stats: COMPLETE Column stats: COMPLETE
+                          TopN Hash Memory Usage: 0.1
+                          value expressions: _col2 (type: bigint)
             Execution mode: llap
             LLAP IO: no inputs
         Reducer 2 

http://git-wip-us.apache.org/repos/asf/hive/blob/851c8aba/ql/src/test/results/clientpositive/llap/check_constraint.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/check_constraint.q.out b/ql/src/test/results/clientpositive/llap/check_constraint.q.out
index e4cd97e..123a3e4 100644
--- a/ql/src/test/results/clientpositive/llap/check_constraint.q.out
+++ b/ql/src/test/results/clientpositive/llap/check_constraint.q.out
@@ -1675,19 +1675,24 @@ STAGE PLANS:
                     expressions: key (type: string), value (type: string), UDFToInteger(key) (type: int), CAST( key AS decimal(5,2)) (type: decimal(5,2))
                     outputColumnNames: _col0, _col1, _col2, _col3
                     Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
-                    Group By Operator
-                      aggregations: min(_col2), max(_col3)
+                    Top N Key Operator
+                      sort order: ++
                       keys: _col0 (type: string), _col1 (type: string)
-                      mode: hash
-                      outputColumnNames: _col0, _col1, _col2, _col3
-                      Statistics: Num rows: 250 Data size: 73500 Basic stats: COMPLETE Column stats: COMPLETE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string), _col1 (type: string)
-                        sort order: ++
-                        Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      top n: 10
+                      Group By Operator
+                        aggregations: min(_col2), max(_col3)
+                        keys: _col0 (type: string), _col1 (type: string)
+                        mode: hash
+                        outputColumnNames: _col0, _col1, _col2, _col3
                         Statistics: Num rows: 250 Data size: 73500 Basic stats: COMPLETE Column stats: COMPLETE
-                        TopN Hash Memory Usage: 0.1
-                        value expressions: _col2 (type: int), _col3 (type: decimal(5,2))
+                        Reduce Output Operator
+                          key expressions: _col0 (type: string), _col1 (type: string)
+                          sort order: ++
+                          Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+                          Statistics: Num rows: 250 Data size: 73500 Basic stats: COMPLETE Column stats: COMPLETE
+                          TopN Hash Memory Usage: 0.1
+                          value expressions: _col2 (type: int), _col3 (type: decimal(5,2))
             Execution mode: vectorized, llap
             LLAP IO: no inputs
         Reducer 2 

http://git-wip-us.apache.org/repos/asf/hive/blob/851c8aba/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/explainuser_1.q.out b/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
index 6a2ae62..f9018b4 100644
--- a/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
+++ b/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
@@ -1264,19 +1264,21 @@ Stage-0
                   PartitionCols:_col0, _col1
                   Group By Operator [GBY_7] (rows=5 width=20)
                     Output:["_col0","_col1","_col2"],aggregations:["count()"],keys:_col1, _col0
-                    Select Operator [SEL_5] (rows=10 width=101)
-                      Output:["_col0","_col1"]
-                      Group By Operator [GBY_4] (rows=10 width=101)
-                        Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0, KEY._col1, KEY._col2
-                      <-Map 1 [SIMPLE_EDGE] llap
-                        SHUFFLE [RS_3]
-                          PartitionCols:_col0, _col1, _col2
-                          Group By Operator [GBY_2] (rows=10 width=101)
-                            Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(c_int)"],keys:key, c_int, c_float
-                            Select Operator [SEL_1] (rows=20 width=88)
-                              Output:["key","c_int","c_float"]
-                              TableScan [TS_0] (rows=20 width=88)
-                                default@cbo_t1,cbo_t1,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
+                    Top N Key Operator [TNK_15] (rows=10 width=101)
+                      keys:_col1, _col0,sort order:++,top n:1
+                      Select Operator [SEL_5] (rows=10 width=101)
+                        Output:["_col0","_col1"]
+                        Group By Operator [GBY_4] (rows=10 width=101)
+                          Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0, KEY._col1, KEY._col2
+                        <-Map 1 [SIMPLE_EDGE] llap
+                          SHUFFLE [RS_3]
+                            PartitionCols:_col0, _col1, _col2
+                            Group By Operator [GBY_2] (rows=10 width=101)
+                              Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(c_int)"],keys:key, c_int, c_float
+                              Select Operator [SEL_1] (rows=20 width=88)
+                                Output:["key","c_int","c_float"]
+                                TableScan [TS_0] (rows=20 width=88)
+                                  default@cbo_t1,cbo_t1,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
 
 PREHOOK: query: explain select key from(select key from (select key from cbo_t1 limit 5)cbo_t2  limit 5)cbo_t3  limit 5
 PREHOOK: type: QUERY