You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2019/11/28 05:52:41 UTC

[hive] branch master updated: HIVE-22532: PTFPPD may push limit incorrectly through Rank/DenseRank function (Jesus Camacho Rodriguez, reviewed by Zoltan Haindrich)

This is an automated email from the ASF dual-hosted git repository.

jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 520aa19  HIVE-22532: PTFPPD may push limit incorrectly through Rank/DenseRank function (Jesus Camacho Rodriguez, reviewed by Zoltan Haindrich)
520aa19 is described below

commit 520aa19b20381bfd2ed25c835443c013f6e6ebb9
Author: Jesus Camacho Rodriguez <jc...@apache.org>
AuthorDate: Sat Nov 23 22:36:29 2019 -0800

    HIVE-22532: PTFPPD may push limit incorrectly through Rank/DenseRank function (Jesus Camacho Rodriguez, reviewed by Zoltan Haindrich)
    
    Close apache/hive#851
---
 .../test/resources/testconfiguration.properties    |   1 +
 .../hive/ql/optimizer/LimitPushdownOptimizer.java  |   4 +
 .../hadoop/hive/ql/optimizer/TopNKeyProcessor.java |   6 +
 .../apache/hadoop/hive/ql/ppd/OpProcFactory.java   |   2 +-
 .../test/queries/clientpositive/windowing_filter.q |  37 ++++
 .../clientpositive/llap/windowing_filter.q.out     | 205 +++++++++++++++++++++
 .../clientpositive/perf/spark/query70.q.out        |   1 -
 .../perf/tez/constraints/query70.q.out             | 106 ++++++-----
 .../results/clientpositive/perf/tez/query70.q.out  | 142 +++++++-------
 9 files changed, 376 insertions(+), 128 deletions(-)

diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 023dbb9..2aa9043 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -905,6 +905,7 @@ minillaplocal.query.files=\
   vectorized_ptf.q,\
   windowing.q,\
   windowing_gby.q,\
+  windowing_filter.q,\
   unionDistinct_2.q,\
   auto_smb_mapjoin_14.q,\
   subquery_views.q,\
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java
index 6cea72f..59ca3f7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java
@@ -168,6 +168,10 @@ public class LimitPushdownOptimizer extends Transform {
         // No limit, nothing to propagate, we just bail out
         return false;
       }
+      if (cRS.getConf().isPTFReduceSink()) {
+        // Limit per partition key not supported yet
+        return false;
+      }
       ReduceSinkOperator pRS = null;
       for (int i = stack.size() - 2 ; i >= 0; i--) {
         Operator<?> operator = (Operator<?>) stack.get(i);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TopNKeyProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TopNKeyProcessor.java
index 4b4cf99..0d6cf3c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TopNKeyProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TopNKeyProcessor.java
@@ -68,6 +68,12 @@ public class TopNKeyProcessor implements NodeProcessor {
       return null;
     }
 
+    // Currently, per partitioning top n key is not supported
+    // in TopNKey operator
+    if (reduceSinkDesc.isPTFReduceSink()) {
+      return null;
+    }
+
     // Check whether the group by operator is in hash mode
     if (groupByDesc.getMode() != GroupByDesc.Mode.HASH) {
       return null;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
index efbd858..38f66f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
@@ -217,6 +217,7 @@ public final class OpProcFactory {
       }
 
       WindowTableFunctionDef wTFn = (WindowTableFunctionDef) conf.getFuncDef();
+
       List<Integer> rFnIdxs = rankingFunctions(wTFn);
 
       if ( rFnIdxs.size() == 0 ) {
@@ -325,7 +326,6 @@ public final class OpProcFactory {
      * reference rows past the Current Row.
      */
     private boolean canPushLimitToReduceSink(WindowTableFunctionDef wTFn) {
-
       for(WindowFunctionDef wFnDef : wTFn.getWindowFunctions() ) {
         if ( (wFnDef.getWFnEval() instanceof GenericUDAFRankEvaluator) ||
             (wFnDef.getWFnEval() instanceof GenericUDAFDenseRankEvaluator )  ||
diff --git a/ql/src/test/queries/clientpositive/windowing_filter.q b/ql/src/test/queries/clientpositive/windowing_filter.q
new file mode 100644
index 0000000..2483c18
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/windowing_filter.q
@@ -0,0 +1,37 @@
+set hive.auto.convert.join.noconditionaltask=true;
+set hive.auto.convert.join.noconditionaltask.size=1431655765;
+
+
+create table testtable_n1000 (s_state string, ss_net_profit double);
+insert into testtable_n1000 values
+  ('AA', 101),
+  ('AB', 102),
+  ('AC', 103),
+  ('AD', 104),
+  ('AE', 105),
+  ('AF', 106),
+  ('AG', 107),
+  ('AH', 108),
+  ('AI', 109),
+  ('AJ', 110);
+
+explain
+select s_state, ranking
+from (
+  select s_state as s_state,
+    sum(ss_net_profit),
+    rank() over ( partition by s_state order by sum(ss_net_profit) desc) as ranking
+  from testtable_n1000
+  group by s_state) tmp1
+where ranking <= 5;
+
+select s_state, ranking
+from (
+  select s_state as s_state,
+    sum(ss_net_profit),
+    rank() over ( partition by s_state order by sum(ss_net_profit) desc) as ranking
+  from testtable_n1000
+  group by s_state) tmp1
+where ranking <= 5;
+
+drop table testtable_n1000;
diff --git a/ql/src/test/results/clientpositive/llap/windowing_filter.q.out b/ql/src/test/results/clientpositive/llap/windowing_filter.q.out
new file mode 100644
index 0000000..78240b5
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/windowing_filter.q.out
@@ -0,0 +1,205 @@
+PREHOOK: query: create table testtable_n1000 (s_state string, ss_net_profit double)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@testtable_n1000
+POSTHOOK: query: create table testtable_n1000 (s_state string, ss_net_profit double)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@testtable_n1000
+PREHOOK: query: insert into testtable_n1000 values
+  ('AA', 101),
+  ('AB', 102),
+  ('AC', 103),
+  ('AD', 104),
+  ('AE', 105),
+  ('AF', 106),
+  ('AG', 107),
+  ('AH', 108),
+  ('AI', 109),
+  ('AJ', 110)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@testtable_n1000
+POSTHOOK: query: insert into testtable_n1000 values
+  ('AA', 101),
+  ('AB', 102),
+  ('AC', 103),
+  ('AD', 104),
+  ('AE', 105),
+  ('AF', 106),
+  ('AG', 107),
+  ('AH', 108),
+  ('AI', 109),
+  ('AJ', 110)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@testtable_n1000
+POSTHOOK: Lineage: testtable_n1000.s_state SCRIPT []
+POSTHOOK: Lineage: testtable_n1000.ss_net_profit SCRIPT []
+PREHOOK: query: explain
+select s_state, ranking
+from (
+  select s_state as s_state,
+    sum(ss_net_profit),
+    rank() over ( partition by s_state order by sum(ss_net_profit) desc) as ranking
+  from testtable_n1000
+  group by s_state) tmp1
+where ranking <= 5
+PREHOOK: type: QUERY
+PREHOOK: Input: default@testtable_n1000
+#### A masked pattern was here ####
+POSTHOOK: query: explain
+select s_state, ranking
+from (
+  select s_state as s_state,
+    sum(ss_net_profit),
+    rank() over ( partition by s_state order by sum(ss_net_profit) desc) as ranking
+  from testtable_n1000
+  group by s_state) tmp1
+where ranking <= 5
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@testtable_n1000
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: testtable_n1000
+                  Statistics: Num rows: 10 Data size: 940 Basic stats: COMPLETE Column stats: COMPLETE
+                  Select Operator
+                    expressions: s_state (type: string), ss_net_profit (type: double)
+                    outputColumnNames: s_state, ss_net_profit
+                    Statistics: Num rows: 10 Data size: 940 Basic stats: COMPLETE Column stats: COMPLETE
+                    Group By Operator
+                      aggregations: sum(ss_net_profit)
+                      keys: s_state (type: string)
+                      minReductionHashAggr: 0.5
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 5 Data size: 470 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        null sort order: a
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 5 Data size: 470 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col1 (type: double)
+            Execution mode: vectorized, llap
+            LLAP IO: no inputs
+        Reducer 2 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: sum(VALUE._col0)
+                keys: KEY._col0 (type: string)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 5 Data size: 470 Basic stats: COMPLETE Column stats: COMPLETE
+                Reduce Output Operator
+                  key expressions: _col0 (type: string), _col1 (type: double)
+                  null sort order: az
+                  sort order: +-
+                  Map-reduce partition columns: _col0 (type: string)
+                  Statistics: Num rows: 5 Data size: 470 Basic stats: COMPLETE Column stats: COMPLETE
+                  TopN Hash Memory Usage: 0.1
+        Reducer 3 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: double)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 5 Data size: 470 Basic stats: COMPLETE Column stats: COMPLETE
+                PTF Operator
+                  Function definitions:
+                      Input definition
+                        input alias: ptf_0
+                        output shape: _col0: string, _col1: double
+                        type: WINDOWING
+                      Windowing table definition
+                        input alias: ptf_1
+                        name: windowingtablefunction
+                        order by: _col1 DESC NULLS LAST
+                        partition by: _col0
+                        raw input shape:
+                        window functions:
+                            window function definition
+                              alias: rank_window_0
+                              arguments: _col1
+                              name: rank
+                              window function: GenericUDAFRankEvaluator
+                              window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX)
+                              isPivotResult: true
+                  Statistics: Num rows: 5 Data size: 470 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: (rank_window_0 <= 5) (type: boolean)
+                    Statistics: Num rows: 1 Data size: 94 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: _col0 (type: string), rank_window_0 (type: int)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 1 Data size: 90 Basic stats: COMPLETE Column stats: COMPLETE
+                      File Output Operator
+                        compressed: false
+                        Statistics: Num rows: 1 Data size: 90 Basic stats: COMPLETE Column stats: COMPLETE
+                        table:
+                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select s_state, ranking
+from (
+  select s_state as s_state,
+    sum(ss_net_profit),
+    rank() over ( partition by s_state order by sum(ss_net_profit) desc) as ranking
+  from testtable_n1000
+  group by s_state) tmp1
+where ranking <= 5
+PREHOOK: type: QUERY
+PREHOOK: Input: default@testtable_n1000
+#### A masked pattern was here ####
+POSTHOOK: query: select s_state, ranking
+from (
+  select s_state as s_state,
+    sum(ss_net_profit),
+    rank() over ( partition by s_state order by sum(ss_net_profit) desc) as ranking
+  from testtable_n1000
+  group by s_state) tmp1
+where ranking <= 5
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@testtable_n1000
+#### A masked pattern was here ####
+AA	1
+AB	1
+AC	1
+AD	1
+AE	1
+AF	1
+AG	1
+AH	1
+AI	1
+AJ	1
+PREHOOK: query: drop table testtable_n1000
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@testtable_n1000
+PREHOOK: Output: default@testtable_n1000
+POSTHOOK: query: drop table testtable_n1000
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@testtable_n1000
+POSTHOOK: Output: default@testtable_n1000
diff --git a/ql/src/test/results/clientpositive/perf/spark/query70.q.out b/ql/src/test/results/clientpositive/perf/spark/query70.q.out
index e43bd33..1721e18 100644
--- a/ql/src/test/results/clientpositive/perf/spark/query70.q.out
+++ b/ql/src/test/results/clientpositive/perf/spark/query70.q.out
@@ -267,7 +267,6 @@ STAGE PLANS:
                       sort order: +
                       Map-reduce partition columns: _col0 (type: string)
                       Statistics: Num rows: 696954748 Data size: 61485550191 Basic stats: COMPLETE Column stats: NONE
-                      TopN Hash Memory Usage: 0.1
                       value expressions: _col1 (type: decimal(17,2))
         Reducer 11 
             Execution mode: vectorized
diff --git a/ql/src/test/results/clientpositive/perf/tez/constraints/query70.q.out b/ql/src/test/results/clientpositive/perf/tez/constraints/query70.q.out
index 6fc46fb..b209ccd 100644
--- a/ql/src/test/results/clientpositive/perf/tez/constraints/query70.q.out
+++ b/ql/src/test/results/clientpositive/perf/tez/constraints/query70.q.out
@@ -100,26 +100,26 @@ Stage-0
     limit:-1
     Stage-1
       Reducer 6 vectorized
-      File Output Operator [FS_171]
-        Limit [LIM_170] (rows=100 width=492)
+      File Output Operator [FS_170]
+        Limit [LIM_169] (rows=100 width=492)
           Number of rows:100
-          Select Operator [SEL_169] (rows=720 width=492)
+          Select Operator [SEL_168] (rows=720 width=492)
             Output:["_col0","_col1","_col2","_col3","_col4"]
           <-Reducer 5 [SIMPLE_EDGE] vectorized
-            SHUFFLE [RS_168]
+            SHUFFLE [RS_167]
               null sort order:zzz,sort order:-++
-              Select Operator [SEL_167] (rows=720 width=492)
+              Select Operator [SEL_166] (rows=720 width=492)
                 Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
-                PTF Operator [PTF_166] (rows=720 width=304)
+                PTF Operator [PTF_165] (rows=720 width=304)
                   Function definitions:[{},{"name:":"windowingtablefunction","order by:":"_col2 DESC NULLS LAST","partition by:":"(grouping(_col3, 1L) + grouping(_col3, 0L)), CASE WHEN ((grouping(_col3, 0L) = UDFToLong(0))) THEN (_col0) ELSE (CAST( null AS STRING)) END"}]
-                  Select Operator [SEL_165] (rows=720 width=304)
+                  Select Operator [SEL_164] (rows=720 width=304)
                     Output:["_col0","_col1","_col2","_col3"]
                   <-Reducer 4 [SIMPLE_EDGE] vectorized
-                    SHUFFLE [RS_164]
+                    SHUFFLE [RS_163]
                       PartitionCols:(grouping(_col3, 1L) + grouping(_col3, 0L)), CASE WHEN ((grouping(_col3, 0L) = UDFToLong(0))) THEN (_col0) ELSE (CAST( null AS STRING)) END,null sort order:aaz,sort order:++-
-                      Select Operator [SEL_163] (rows=720 width=304)
+                      Select Operator [SEL_162] (rows=720 width=304)
                         Output:["_col0","_col1","_col2","_col3"]
-                        Group By Operator [GBY_162] (rows=720 width=304)
+                        Group By Operator [GBY_161] (rows=720 width=304)
                           Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0, KEY._col1, KEY._col2
                         <-Reducer 3 [SIMPLE_EDGE]
                           SHUFFLE [RS_50]
@@ -128,92 +128,90 @@ Stage-0
                               Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(_col2)"],keys:_col0, _col1, 0L
                               Select Operator [SEL_47] (rows=525329897 width=290)
                                 Output:["_col0","_col1","_col2"]
-                                Merge Join Operator [MERGEJOIN_137] (rows=525329897 width=290)
+                                Merge Join Operator [MERGEJOIN_136] (rows=525329897 width=290)
                                   Conds:RS_44._col1=RS_45._col0(Inner),Output:["_col2","_col6","_col7"]
                                 <-Reducer 2 [SIMPLE_EDGE]
                                   SHUFFLE [RS_44]
                                     PartitionCols:_col1,null sort order:a,sort order:+
-                                    Merge Join Operator [MERGEJOIN_133] (rows=525329897 width=110)
-                                      Conds:RS_148._col0=RS_140._col0(Inner),Output:["_col1","_col2"]
+                                    Merge Join Operator [MERGEJOIN_132] (rows=525329897 width=110)
+                                      Conds:RS_147._col0=RS_139._col0(Inner),Output:["_col1","_col2"]
                                     <-Map 11 [SIMPLE_EDGE] vectorized
-                                      SHUFFLE [RS_140]
+                                      SHUFFLE [RS_139]
                                         PartitionCols:_col0,null sort order:a,sort order:+
-                                        Select Operator [SEL_139] (rows=317 width=8)
+                                        Select Operator [SEL_138] (rows=317 width=8)
                                           Output:["_col0"]
-                                          Filter Operator [FIL_138] (rows=317 width=8)
+                                          Filter Operator [FIL_137] (rows=317 width=8)
                                             predicate:d_month_seq BETWEEN 1212 AND 1223
                                             TableScan [TS_3] (rows=73049 width=8)
                                               default@date_dim,d1,Tbl:COMPLETE,Col:COMPLETE,Output:["d_date_sk","d_month_seq"]
                                     <-Map 1 [SIMPLE_EDGE] vectorized
-                                      SHUFFLE [RS_148]
+                                      SHUFFLE [RS_147]
                                         PartitionCols:_col0,null sort order:a,sort order:+
-                                        Select Operator [SEL_147] (rows=525329897 width=114)
+                                        Select Operator [SEL_146] (rows=525329897 width=114)
                                           Output:["_col0","_col1","_col2"]
-                                          Filter Operator [FIL_146] (rows=525329897 width=114)
+                                          Filter Operator [FIL_145] (rows=525329897 width=114)
                                             predicate:(ss_sold_date_sk is not null and ss_store_sk is not null and ss_sold_date_sk BETWEEN DynamicValue(RS_42_d1_d_date_sk_min) AND DynamicValue(RS_42_d1_d_date_sk_max) and in_bloom_filter(ss_sold_date_sk, DynamicValue(RS_42_d1_d_date_sk_bloom_filter)))
                                             TableScan [TS_0] (rows=575995635 width=114)
                                               default@store_sales,store_sales,Tbl:COMPLETE,Col:COMPLETE,Output:["ss_sold_date_sk","ss_store_sk","ss_net_profit"]
                                             <-Reducer 12 [BROADCAST_EDGE] vectorized
-                                              BROADCAST [RS_145]
-                                                Group By Operator [GBY_144] (rows=1 width=12)
+                                              BROADCAST [RS_144]
+                                                Group By Operator [GBY_143] (rows=1 width=12)
                                                   Output:["_col0","_col1","_col2"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","bloom_filter(VALUE._col2, expectedEntries=1000000)"]
                                                 <-Map 11 [CUSTOM_SIMPLE_EDGE] vectorized
-                                                  SHUFFLE [RS_143]
-                                                    Group By Operator [GBY_142] (rows=1 width=12)
+                                                  SHUFFLE [RS_142]
+                                                    Group By Operator [GBY_141] (rows=1 width=12)
                                                       Output:["_col0","_col1","_col2"],aggregations:["min(_col0)","max(_col0)","bloom_filter(_col0, expectedEntries=1000000)"]
-                                                      Select Operator [SEL_141] (rows=317 width=4)
+                                                      Select Operator [SEL_140] (rows=317 width=4)
                                                         Output:["_col0"]
-                                                         Please refer to the previous Select Operator [SEL_139]
+                                                         Please refer to the previous Select Operator [SEL_138]
                                 <-Reducer 10 [SIMPLE_EDGE]
                                   SHUFFLE [RS_45]
                                     PartitionCols:_col0,null sort order:a,sort order:+
-                                    Merge Join Operator [MERGEJOIN_136] (rows=556 width=188)
-                                      Conds:RS_161._col2=RS_158._col0(Inner),Output:["_col0","_col1","_col2"]
+                                    Merge Join Operator [MERGEJOIN_135] (rows=556 width=188)
+                                      Conds:RS_160._col2=RS_157._col0(Inner),Output:["_col0","_col1","_col2"]
                                     <-Map 13 [SIMPLE_EDGE] vectorized
-                                      SHUFFLE [RS_161]
+                                      SHUFFLE [RS_160]
                                         PartitionCols:_col2,null sort order:a,sort order:+
-                                        Select Operator [SEL_160] (rows=1704 width=188)
+                                        Select Operator [SEL_159] (rows=1704 width=188)
                                           Output:["_col0","_col1","_col2"]
-                                          Filter Operator [FIL_159] (rows=1704 width=188)
+                                          Filter Operator [FIL_158] (rows=1704 width=188)
                                             predicate:s_state is not null
                                             TableScan [TS_6] (rows=1704 width=188)
                                               default@store,store,Tbl:COMPLETE,Col:COMPLETE,Output:["s_store_sk","s_county","s_state"]
                                     <-Reducer 9 [SIMPLE_EDGE] vectorized
-                                      SHUFFLE [RS_158]
+                                      SHUFFLE [RS_157]
                                         PartitionCols:_col0,null sort order:a,sort order:+
-                                        Select Operator [SEL_157] (rows=16 width=86)
+                                        Select Operator [SEL_156] (rows=16 width=86)
                                           Output:["_col0"]
-                                          Filter Operator [FIL_156] (rows=16 width=198)
+                                          Filter Operator [FIL_155] (rows=16 width=198)
                                             predicate:(rank_window_0 <= 5)
-                                            PTF Operator [PTF_155] (rows=49 width=198)
+                                            PTF Operator [PTF_154] (rows=49 width=198)
                                               Function definitions:[{},{"name:":"windowingtablefunction","order by:":"_col1 DESC NULLS LAST","partition by:":"_col0"}]
-                                              Select Operator [SEL_154] (rows=49 width=198)
+                                              Select Operator [SEL_153] (rows=49 width=198)
                                                 Output:["_col0","_col1"]
                                               <-Reducer 8 [SIMPLE_EDGE] vectorized
-                                                SHUFFLE [RS_153]
+                                                SHUFFLE [RS_152]
                                                   PartitionCols:_col0,null sort order:az,sort order:+-
-                                                  Group By Operator [GBY_152] (rows=49 width=198)
+                                                  Group By Operator [GBY_151] (rows=49 width=198)
                                                     Output:["_col0","_col1"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0
                                                   <-Reducer 7 [SIMPLE_EDGE]
                                                     SHUFFLE [RS_26]
                                                       PartitionCols:_col0,null sort order:a,sort order:+
                                                       Group By Operator [GBY_25] (rows=19404 width=198)
                                                         Output:["_col0","_col1"],aggregations:["sum(_col2)"],keys:_col5
-                                                        Top N Key Operator [TNK_87] (rows=525329897 width=192)
-                                                          keys:_col5,null sort order:a,sort order:+,top n:6
-                                                          Merge Join Operator [MERGEJOIN_135] (rows=525329897 width=192)
-                                                            Conds:RS_21._col1=RS_151._col0(Inner),Output:["_col2","_col5"]
-                                                          <-Reducer 2 [SIMPLE_EDGE]
-                                                            SHUFFLE [RS_21]
-                                                              PartitionCols:_col1,null sort order:a,sort order:+
-                                                               Please refer to the previous Merge Join Operator [MERGEJOIN_133]
-                                                          <-Map 14 [SIMPLE_EDGE] vectorized
-                                                            SHUFFLE [RS_151]
-                                                              PartitionCols:_col0,null sort order:a,sort order:+
-                                                              Select Operator [SEL_150] (rows=1704 width=90)
-                                                                Output:["_col0","_col1"]
-                                                                Filter Operator [FIL_149] (rows=1704 width=90)
-                                                                  predicate:s_state is not null
-                                                                  TableScan [TS_15] (rows=1704 width=90)
-                                                                    default@store,store,Tbl:COMPLETE,Col:COMPLETE,Output:["s_store_sk","s_state"]
+                                                        Merge Join Operator [MERGEJOIN_134] (rows=525329897 width=192)
+                                                          Conds:RS_21._col1=RS_150._col0(Inner),Output:["_col2","_col5"]
+                                                        <-Reducer 2 [SIMPLE_EDGE]
+                                                          SHUFFLE [RS_21]
+                                                            PartitionCols:_col1,null sort order:a,sort order:+
+                                                             Please refer to the previous Merge Join Operator [MERGEJOIN_132]
+                                                        <-Map 14 [SIMPLE_EDGE] vectorized
+                                                          SHUFFLE [RS_150]
+                                                            PartitionCols:_col0,null sort order:a,sort order:+
+                                                            Select Operator [SEL_149] (rows=1704 width=90)
+                                                              Output:["_col0","_col1"]
+                                                              Filter Operator [FIL_148] (rows=1704 width=90)
+                                                                predicate:s_state is not null
+                                                                TableScan [TS_15] (rows=1704 width=90)
+                                                                  default@store,store,Tbl:COMPLETE,Col:COMPLETE,Output:["s_store_sk","s_state"]
 
diff --git a/ql/src/test/results/clientpositive/perf/tez/query70.q.out b/ql/src/test/results/clientpositive/perf/tez/query70.q.out
index 26b1ca1..171037e 100644
--- a/ql/src/test/results/clientpositive/perf/tez/query70.q.out
+++ b/ql/src/test/results/clientpositive/perf/tez/query70.q.out
@@ -100,26 +100,26 @@ Stage-0
     limit:-1
     Stage-1
       Reducer 7 vectorized
-      File Output Operator [FS_169]
-        Limit [LIM_168] (rows=100 width=492)
+      File Output Operator [FS_168]
+        Limit [LIM_167] (rows=100 width=492)
           Number of rows:100
-          Select Operator [SEL_167] (rows=720 width=492)
+          Select Operator [SEL_166] (rows=720 width=492)
             Output:["_col0","_col1","_col2","_col3","_col4"]
           <-Reducer 6 [SIMPLE_EDGE] vectorized
-            SHUFFLE [RS_166]
+            SHUFFLE [RS_165]
               null sort order:zzz,sort order:-++
-              Select Operator [SEL_165] (rows=720 width=492)
+              Select Operator [SEL_164] (rows=720 width=492)
                 Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
-                PTF Operator [PTF_164] (rows=720 width=304)
+                PTF Operator [PTF_163] (rows=720 width=304)
                   Function definitions:[{},{"name:":"windowingtablefunction","order by:":"_col2 DESC NULLS LAST","partition by:":"(grouping(_col3, 1L) + grouping(_col3, 0L)), CASE WHEN ((grouping(_col3, 0L) = UDFToLong(0))) THEN (_col0) ELSE (CAST( null AS STRING)) END"}]
-                  Select Operator [SEL_163] (rows=720 width=304)
+                  Select Operator [SEL_162] (rows=720 width=304)
                     Output:["_col0","_col1","_col2","_col3"]
                   <-Reducer 5 [SIMPLE_EDGE] vectorized
-                    SHUFFLE [RS_162]
+                    SHUFFLE [RS_161]
                       PartitionCols:(grouping(_col3, 1L) + grouping(_col3, 0L)), CASE WHEN ((grouping(_col3, 0L) = UDFToLong(0))) THEN (_col0) ELSE (CAST( null AS STRING)) END,null sort order:aaz,sort order:++-
-                      Select Operator [SEL_161] (rows=720 width=304)
+                      Select Operator [SEL_160] (rows=720 width=304)
                         Output:["_col0","_col1","_col2","_col3"]
-                        Group By Operator [GBY_160] (rows=720 width=304)
+                        Group By Operator [GBY_159] (rows=720 width=304)
                           Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0, KEY._col1, KEY._col2
                         <-Reducer 4 [SIMPLE_EDGE]
                           SHUFFLE [RS_49]
@@ -128,91 +128,89 @@ Stage-0
                               Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(_col2)"],keys:_col0, _col1, 0L
                               Select Operator [SEL_46] (rows=171536292 width=280)
                                 Output:["_col0","_col1","_col2"]
-                                Merge Join Operator [MERGEJOIN_135] (rows=171536292 width=280)
-                                  Conds:RS_43._col7=RS_159._col0(Inner),Output:["_col2","_col6","_col7"]
+                                Merge Join Operator [MERGEJOIN_134] (rows=171536292 width=280)
+                                  Conds:RS_43._col7=RS_158._col0(Inner),Output:["_col2","_col6","_col7"]
                                 <-Reducer 10 [SIMPLE_EDGE] vectorized
-                                  SHUFFLE [RS_159]
+                                  SHUFFLE [RS_158]
                                     PartitionCols:_col0,null sort order:a,sort order:+
-                                    Select Operator [SEL_158] (rows=16 width=86)
+                                    Select Operator [SEL_157] (rows=16 width=86)
                                       Output:["_col0"]
-                                      Filter Operator [FIL_157] (rows=16 width=198)
+                                      Filter Operator [FIL_156] (rows=16 width=198)
                                         predicate:(rank_window_0 <= 5)
-                                        PTF Operator [PTF_156] (rows=49 width=198)
+                                        PTF Operator [PTF_155] (rows=49 width=198)
                                           Function definitions:[{},{"name:":"windowingtablefunction","order by:":"_col1 DESC NULLS LAST","partition by:":"_col0"}]
-                                          Select Operator [SEL_155] (rows=49 width=198)
+                                          Select Operator [SEL_154] (rows=49 width=198)
                                             Output:["_col0","_col1"]
                                           <-Reducer 9 [SIMPLE_EDGE] vectorized
-                                            SHUFFLE [RS_154]
+                                            SHUFFLE [RS_153]
                                               PartitionCols:_col0,null sort order:az,sort order:+-
-                                              Group By Operator [GBY_153] (rows=49 width=198)
+                                              Group By Operator [GBY_152] (rows=49 width=198)
                                                 Output:["_col0","_col1"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0
                                               <-Reducer 8 [SIMPLE_EDGE]
                                                 SHUFFLE [RS_26]
                                                   PartitionCols:_col0,null sort order:a,sort order:+
                                                   Group By Operator [GBY_25] (rows=19404 width=198)
                                                     Output:["_col0","_col1"],aggregations:["sum(_col2)"],keys:_col5
-                                                    Top N Key Operator [TNK_85] (rows=525329897 width=192)
-                                                      keys:_col5,null sort order:a,sort order:+,top n:6
-                                                      Merge Join Operator [MERGEJOIN_134] (rows=525329897 width=192)
-                                                        Conds:RS_21._col1=RS_152._col0(Inner),Output:["_col2","_col5"]
-                                                      <-Reducer 2 [SIMPLE_EDGE]
-                                                        SHUFFLE [RS_21]
-                                                          PartitionCols:_col1,null sort order:a,sort order:+
-                                                          Merge Join Operator [MERGEJOIN_131] (rows=525329897 width=110)
-                                                            Conds:RS_146._col0=RS_138._col0(Inner),Output:["_col1","_col2"]
-                                                          <-Map 11 [SIMPLE_EDGE] vectorized
-                                                            SHUFFLE [RS_138]
-                                                              PartitionCols:_col0,null sort order:a,sort order:+
-                                                              Select Operator [SEL_137] (rows=317 width=8)
-                                                                Output:["_col0"]
-                                                                Filter Operator [FIL_136] (rows=317 width=8)
-                                                                  predicate:(d_month_seq BETWEEN 1212 AND 1223 and d_date_sk is not null)
-                                                                  TableScan [TS_3] (rows=73049 width=8)
-                                                                    default@date_dim,d1,Tbl:COMPLETE,Col:COMPLETE,Output:["d_date_sk","d_month_seq"]
-                                                          <-Map 1 [SIMPLE_EDGE] vectorized
-                                                            SHUFFLE [RS_146]
-                                                              PartitionCols:_col0,null sort order:a,sort order:+
-                                                              Select Operator [SEL_145] (rows=525329897 width=114)
-                                                                Output:["_col0","_col1","_col2"]
-                                                                Filter Operator [FIL_144] (rows=525329897 width=114)
-                                                                  predicate:(ss_sold_date_sk is not null and ss_store_sk is not null and ss_sold_date_sk BETWEEN DynamicValue(RS_38_d1_d_date_sk_min) AND DynamicValue(RS_38_d1_d_date_sk_max) and in_bloom_filter(ss_sold_date_sk, DynamicValue(RS_38_d1_d_date_sk_bloom_filter)))
-                                                                  TableScan [TS_0] (rows=575995635 width=114)
-                                                                    default@store_sales,store_sales,Tbl:COMPLETE,Col:COMPLETE,Output:["ss_sold_date_sk","ss_store_sk","ss_net_profit"]
-                                                                  <-Reducer 12 [BROADCAST_EDGE] vectorized
-                                                                    BROADCAST [RS_143]
-                                                                      Group By Operator [GBY_142] (rows=1 width=12)
-                                                                        Output:["_col0","_col1","_col2"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","bloom_filter(VALUE._col2, expectedEntries=1000000)"]
-                                                                      <-Map 11 [CUSTOM_SIMPLE_EDGE] vectorized
-                                                                        SHUFFLE [RS_141]
-                                                                          Group By Operator [GBY_140] (rows=1 width=12)
-                                                                            Output:["_col0","_col1","_col2"],aggregations:["min(_col0)","max(_col0)","bloom_filter(_col0, expectedEntries=1000000)"]
-                                                                            Select Operator [SEL_139] (rows=317 width=4)
-                                                                              Output:["_col0"]
-                                                                               Please refer to the previous Select Operator [SEL_137]
-                                                      <-Map 14 [SIMPLE_EDGE] vectorized
-                                                        SHUFFLE [RS_152]
-                                                          PartitionCols:_col0,null sort order:a,sort order:+
-                                                          Select Operator [SEL_151] (rows=1704 width=90)
-                                                            Output:["_col0","_col1"]
-                                                            Filter Operator [FIL_150] (rows=1704 width=90)
-                                                              predicate:(s_store_sk is not null and s_state is not null)
-                                                              TableScan [TS_15] (rows=1704 width=90)
-                                                                default@store,store,Tbl:COMPLETE,Col:COMPLETE,Output:["s_store_sk","s_state"]
+                                                    Merge Join Operator [MERGEJOIN_133] (rows=525329897 width=192)
+                                                      Conds:RS_21._col1=RS_151._col0(Inner),Output:["_col2","_col5"]
+                                                    <-Reducer 2 [SIMPLE_EDGE]
+                                                      SHUFFLE [RS_21]
+                                                        PartitionCols:_col1,null sort order:a,sort order:+
+                                                        Merge Join Operator [MERGEJOIN_130] (rows=525329897 width=110)
+                                                          Conds:RS_145._col0=RS_137._col0(Inner),Output:["_col1","_col2"]
+                                                        <-Map 11 [SIMPLE_EDGE] vectorized
+                                                          SHUFFLE [RS_137]
+                                                            PartitionCols:_col0,null sort order:a,sort order:+
+                                                            Select Operator [SEL_136] (rows=317 width=8)
+                                                              Output:["_col0"]
+                                                              Filter Operator [FIL_135] (rows=317 width=8)
+                                                                predicate:(d_month_seq BETWEEN 1212 AND 1223 and d_date_sk is not null)
+                                                                TableScan [TS_3] (rows=73049 width=8)
+                                                                  default@date_dim,d1,Tbl:COMPLETE,Col:COMPLETE,Output:["d_date_sk","d_month_seq"]
+                                                        <-Map 1 [SIMPLE_EDGE] vectorized
+                                                          SHUFFLE [RS_145]
+                                                            PartitionCols:_col0,null sort order:a,sort order:+
+                                                            Select Operator [SEL_144] (rows=525329897 width=114)
+                                                              Output:["_col0","_col1","_col2"]
+                                                              Filter Operator [FIL_143] (rows=525329897 width=114)
+                                                                predicate:(ss_sold_date_sk is not null and ss_store_sk is not null and ss_sold_date_sk BETWEEN DynamicValue(RS_38_d1_d_date_sk_min) AND DynamicValue(RS_38_d1_d_date_sk_max) and in_bloom_filter(ss_sold_date_sk, DynamicValue(RS_38_d1_d_date_sk_bloom_filter)))
+                                                                TableScan [TS_0] (rows=575995635 width=114)
+                                                                  default@store_sales,store_sales,Tbl:COMPLETE,Col:COMPLETE,Output:["ss_sold_date_sk","ss_store_sk","ss_net_profit"]
+                                                                <-Reducer 12 [BROADCAST_EDGE] vectorized
+                                                                  BROADCAST [RS_142]
+                                                                    Group By Operator [GBY_141] (rows=1 width=12)
+                                                                      Output:["_col0","_col1","_col2"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","bloom_filter(VALUE._col2, expectedEntries=1000000)"]
+                                                                    <-Map 11 [CUSTOM_SIMPLE_EDGE] vectorized
+                                                                      SHUFFLE [RS_140]
+                                                                        Group By Operator [GBY_139] (rows=1 width=12)
+                                                                          Output:["_col0","_col1","_col2"],aggregations:["min(_col0)","max(_col0)","bloom_filter(_col0, expectedEntries=1000000)"]
+                                                                          Select Operator [SEL_138] (rows=317 width=4)
+                                                                            Output:["_col0"]
+                                                                             Please refer to the previous Select Operator [SEL_136]
+                                                    <-Map 14 [SIMPLE_EDGE] vectorized
+                                                      SHUFFLE [RS_151]
+                                                        PartitionCols:_col0,null sort order:a,sort order:+
+                                                        Select Operator [SEL_150] (rows=1704 width=90)
+                                                          Output:["_col0","_col1"]
+                                                          Filter Operator [FIL_149] (rows=1704 width=90)
+                                                            predicate:(s_store_sk is not null and s_state is not null)
+                                                            TableScan [TS_15] (rows=1704 width=90)
+                                                              default@store,store,Tbl:COMPLETE,Col:COMPLETE,Output:["s_store_sk","s_state"]
                                 <-Reducer 3 [SIMPLE_EDGE]
                                   SHUFFLE [RS_43]
                                     PartitionCols:_col7,null sort order:a,sort order:+
-                                    Merge Join Operator [MERGEJOIN_132] (rows=525329897 width=290)
-                                      Conds:RS_40._col1=RS_149._col0(Inner),Output:["_col2","_col6","_col7"]
+                                    Merge Join Operator [MERGEJOIN_131] (rows=525329897 width=290)
+                                      Conds:RS_40._col1=RS_148._col0(Inner),Output:["_col2","_col6","_col7"]
                                     <-Reducer 2 [SIMPLE_EDGE]
                                       SHUFFLE [RS_40]
                                         PartitionCols:_col1,null sort order:a,sort order:+
-                                         Please refer to the previous Merge Join Operator [MERGEJOIN_131]
+                                         Please refer to the previous Merge Join Operator [MERGEJOIN_130]
                                     <-Map 13 [SIMPLE_EDGE] vectorized
-                                      SHUFFLE [RS_149]
+                                      SHUFFLE [RS_148]
                                         PartitionCols:_col0,null sort order:a,sort order:+
-                                        Select Operator [SEL_148] (rows=1704 width=188)
+                                        Select Operator [SEL_147] (rows=1704 width=188)
                                           Output:["_col0","_col1","_col2"]
-                                          Filter Operator [FIL_147] (rows=1704 width=188)
+                                          Filter Operator [FIL_146] (rows=1704 width=188)
                                             predicate:(s_state is not null and s_store_sk is not null)
                                             TableScan [TS_6] (rows=1704 width=188)
                                               default@store,store,Tbl:COMPLETE,Col:COMPLETE,Output:["s_store_sk","s_county","s_state"]