You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2019/04/27 01:31:45 UTC

[hive] 03/04: HIVE-21633: Estimate range for value generated by aggregate function in statistics annotation (Jesus Camacho Rodriguez, reviewed by Vineet Garg)

This is an automated email from the ASF dual-hosted git repository.

jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git

commit 4cab80d8a8007c3cc332aa33279d8c7ebde48ed2
Author: Jesus Camacho Rodriguez <jc...@apache.org>
AuthorDate: Thu Apr 18 13:32:38 2019 -0700

    HIVE-21633: Estimate range for value generated by aggregate function in statistics annotation (Jesus Camacho Rodriguez, reviewed by Vineet Garg)
    
    Close apache/hive#603
---
 .../stats/annotation/StatsRulesProcFactory.java    | 78 ++++++++++++++++++++++
 .../clientpositive/groupby_grouping_window.q.out   | 12 ++--
 .../clientpositive/llap/subquery_scalar.q.out      |  8 +--
 .../clientpositive/llap/subquery_select.q.out      | 12 ++--
 .../llap/vector_groupby_grouping_window.q.out      | 12 ++--
 .../perf/tez/constraints/query78.q.out             | 14 ++--
 .../results/clientpositive/perf/tez/query78.q.out  | 16 ++---
 7 files changed, 115 insertions(+), 37 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
index 6a1c210..0258e36 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
@@ -84,7 +84,12 @@ import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
 import org.apache.hadoop.hive.ql.plan.mapper.StatsSource;
 import org.apache.hadoop.hive.ql.stats.OperatorStats;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn;
@@ -1479,6 +1484,7 @@ public class StatsRulesProcFactory {
       // if UDAFs are present, new columns needs to be added
       if (!aggDesc.isEmpty() && stats != null) {
         List<ColStatistics> aggColStats = Lists.newArrayList();
+        int idx = 0;
         for (ColumnInfo ci : rs.getSignature()) {
 
           // if the columns in row schema is not contained in column
@@ -1492,6 +1498,7 @@ public class StatsRulesProcFactory {
             cs.setCountDistint(stats.getNumRows());
             cs.setNumNulls(0);
             cs.setAvgColLen(StatsUtils.getAvgColLenOf(conf, ci.getObjectInspector(), colType));
+            computeAggregateColumnMinMax(cs, conf, aggDesc.get(idx++), colType, parentStats);
             aggColStats.add(cs);
           }
         }
@@ -1524,6 +1531,77 @@ public class StatsRulesProcFactory {
       return null;
     }
 
+    /**
+     * If possible, sets the min / max value for the column based on the aggregate function
+     * being calculated and its input.
+     */
+    private static void computeAggregateColumnMinMax(ColStatistics cs, HiveConf conf, AggregationDesc agg, String aggType,
+        Statistics parentStats) throws SemanticException {
+      if (agg.getParameters() != null && agg.getParameters().size() == 1) {
+        ColStatistics parentCS = StatsUtils.getColStatisticsFromExpression(
+            conf, parentStats, agg.getParameters().get(0));
+        if (parentCS != null && parentCS.getRange() != null &&
+            parentCS.getRange().minValue != null && parentCS.getRange().maxValue != null) {
+          long valuesCount = agg.getDistinct() ?
+              parentCS.getCountDistint() :
+              parentStats.getNumRows() - parentCS.getNumNulls();
+          Range range = parentCS.getRange();
+          // Get the aggregate function matching the name in the query.
+          GenericUDAFResolver udaf =
+              FunctionRegistry.getGenericUDAFResolver(agg.getGenericUDAFName());
+          if (udaf instanceof GenericUDAFCount) {
+            cs.setRange(new Range(0, valuesCount));
+          } else if (udaf instanceof GenericUDAFMax || udaf instanceof GenericUDAFMin) {
+            cs.setRange(new Range(range.minValue, range.maxValue));
+          } else if (udaf instanceof GenericUDAFSum) {
+            switch (aggType) {
+            case serdeConstants.TINYINT_TYPE_NAME:
+            case serdeConstants.SMALLINT_TYPE_NAME:
+            case serdeConstants.DATE_TYPE_NAME:
+            case serdeConstants.INT_TYPE_NAME:
+            case serdeConstants.BIGINT_TYPE_NAME:
+              long maxValueLong = range.maxValue.longValue();
+              long minValueLong = range.minValue.longValue();
+              // If min value is less or equal to max value (legal)
+              if (minValueLong <= maxValueLong && minValueLong >= 0) {
+                // min = minValue, max = (minValue + maxValue) * 0.5 * parentNumRows
+                cs.setRange(new Range(
+                    minValueLong,
+                    StatsUtils.safeMult(
+                        StatsUtils.safeMult(StatsUtils.safeAdd(minValueLong, maxValueLong), 0.5),
+                        valuesCount)));
+              }
+              break;
+            case serdeConstants.FLOAT_TYPE_NAME:
+            case serdeConstants.DOUBLE_TYPE_NAME:
+              double maxValueDouble = range.maxValue.doubleValue();
+              double minValueDouble = range.minValue.doubleValue();
+              // If min value is less or equal to max value (legal)
+              if (minValueDouble <= maxValueDouble && minValueDouble >= 0) {
+                // min = minValue, max = (minValue + maxValue) * 0.5 * parentNumRows
+                cs.setRange(new Range(
+                    minValueDouble,
+                    (minValueDouble + maxValueDouble) * 0.5 * valuesCount));
+              }
+              break;
+            default:
+              if (aggType.startsWith(serdeConstants.DECIMAL_TYPE_NAME)) {
+                BigDecimal maxValueBD = new BigDecimal(range.maxValue.toString());
+                BigDecimal minValueBD = new BigDecimal(range.minValue.toString());
+                // If min value is less or equal to max value (legal)
+                if (minValueBD.compareTo(maxValueBD) <= 0 && minValueBD.compareTo(BigDecimal.ZERO) >= 0) {
+                  // min = minValue, max = (minValue + maxValue) * 0.5 * parentNumRows
+                  cs.setRange(new Range(
+                      minValueBD,
+                      minValueBD.add(maxValueBD).multiply(new BigDecimal(0.5)).multiply(new BigDecimal(valuesCount))));
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+
     private long getParentNumRows(GroupByOperator op, List<ExprNodeDesc> gbyKeys, HiveConf conf) {
       if(gbyKeys == null || gbyKeys.isEmpty()) {
         return op.getParentOperators().get(0).getStatistics().getNumRows();
diff --git a/ql/src/test/results/clientpositive/groupby_grouping_window.q.out b/ql/src/test/results/clientpositive/groupby_grouping_window.q.out
index e6cc459..7f687da 100644
--- a/ql/src/test/results/clientpositive/groupby_grouping_window.q.out
+++ b/ql/src/test/results/clientpositive/groupby_grouping_window.q.out
@@ -75,7 +75,7 @@ STAGE PLANS:
           pruneGroupingSetId: true
           Filter Operator
             predicate: (_col3 > 0) (type: boolean)
-            Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
+            Statistics: Num rows: 3 Data size: 60 Basic stats: COMPLETE Column stats: COMPLETE
             File Output Operator
               compressed: false
               table:
@@ -91,14 +91,14 @@ STAGE PLANS:
               key expressions: _col0 (type: int), _col3 (type: int)
               sort order: ++
               Map-reduce partition columns: _col0 (type: int)
-              Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
+              Statistics: Num rows: 3 Data size: 60 Basic stats: COMPLETE Column stats: COMPLETE
               value expressions: _col2 (type: int)
       Execution mode: vectorized
       Reduce Operator Tree:
         Select Operator
           expressions: KEY.reducesinkkey0 (type: int), VALUE._col1 (type: int), KEY.reducesinkkey1 (type: int)
           outputColumnNames: _col0, _col2, _col3
-          Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
+          Statistics: Num rows: 3 Data size: 60 Basic stats: COMPLETE Column stats: COMPLETE
           PTF Operator
             Function definitions:
                 Input definition
@@ -119,14 +119,14 @@ STAGE PLANS:
                         window function: GenericUDAFRankEvaluator
                         window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX)
                         isPivotResult: true
-            Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
+            Statistics: Num rows: 3 Data size: 60 Basic stats: COMPLETE Column stats: COMPLETE
             Select Operator
               expressions: _col0 (type: int), _col2 (type: int), _col3 (type: int), rank_window_0 (type: int)
               outputColumnNames: _col0, _col1, _col2, _col3
-              Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE
+              Statistics: Num rows: 3 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE
               File Output Operator
                 compressed: false
-                Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE
+                Statistics: Num rows: 3 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE
                 table:
                     input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                     output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
diff --git a/ql/src/test/results/clientpositive/llap/subquery_scalar.q.out b/ql/src/test/results/clientpositive/llap/subquery_scalar.q.out
index 0d8ff14..5817f98 100644
--- a/ql/src/test/results/clientpositive/llap/subquery_scalar.q.out
+++ b/ql/src/test/results/clientpositive/llap/subquery_scalar.q.out
@@ -1500,7 +1500,7 @@ STAGE PLANS:
                 keys:
                   0 _col0 (type: string)
                   1 _col0 (type: string)
-                Statistics: Num rows: 4 Data size: 32 Basic stats: COMPLETE Column stats: COMPLETE
+                Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: COMPLETE
                 Group By Operator
                   aggregations: count()
                   minReductionHashAggr: 0.0
@@ -1537,16 +1537,16 @@ STAGE PLANS:
                 Statistics: Num rows: 13 Data size: 1625 Basic stats: COMPLETE Column stats: COMPLETE
                 Filter Operator
                   predicate: (_col1 > 100) (type: boolean)
-                  Statistics: Num rows: 4 Data size: 500 Basic stats: COMPLETE Column stats: COMPLETE
+                  Statistics: Num rows: 13 Data size: 1625 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: _col0 (type: string)
                     outputColumnNames: _col0
-                    Statistics: Num rows: 4 Data size: 484 Basic stats: COMPLETE Column stats: COMPLETE
+                    Statistics: Num rows: 13 Data size: 1573 Basic stats: COMPLETE Column stats: COMPLETE
                     Reduce Output Operator
                       key expressions: _col0 (type: string)
                       sort order: +
                       Map-reduce partition columns: _col0 (type: string)
-                      Statistics: Num rows: 4 Data size: 484 Basic stats: COMPLETE Column stats: COMPLETE
+                      Statistics: Num rows: 13 Data size: 1573 Basic stats: COMPLETE Column stats: COMPLETE
 
   Stage: Stage-0
     Fetch Operator
diff --git a/ql/src/test/results/clientpositive/llap/subquery_select.q.out b/ql/src/test/results/clientpositive/llap/subquery_select.q.out
index fc70407..d58905c 100644
--- a/ql/src/test/results/clientpositive/llap/subquery_select.q.out
+++ b/ql/src/test/results/clientpositive/llap/subquery_select.q.out
@@ -5057,16 +5057,16 @@ STAGE PLANS:
                   Statistics: Num rows: 13 Data size: 208 Basic stats: COMPLETE Column stats: COMPLETE
                   Filter Operator
                     predicate: (_col2 > 0L) (type: boolean)
-                    Statistics: Num rows: 4 Data size: 64 Basic stats: COMPLETE Column stats: COMPLETE
+                    Statistics: Num rows: 13 Data size: 208 Basic stats: COMPLETE Column stats: COMPLETE
                     Select Operator
                       expressions: _col1 (type: int)
                       outputColumnNames: _col0
-                      Statistics: Num rows: 4 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE
+                      Statistics: Num rows: 13 Data size: 52 Basic stats: COMPLETE Column stats: COMPLETE
                       Reduce Output Operator
                         key expressions: _col0 (type: int)
                         sort order: +
                         Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 4 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE
+                        Statistics: Num rows: 13 Data size: 52 Basic stats: COMPLETE Column stats: COMPLETE
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
@@ -5534,16 +5534,16 @@ STAGE PLANS:
                   Statistics: Num rows: 13 Data size: 208 Basic stats: COMPLETE Column stats: COMPLETE
                   Filter Operator
                     predicate: (_col2 > 0L) (type: boolean)
-                    Statistics: Num rows: 4 Data size: 64 Basic stats: COMPLETE Column stats: COMPLETE
+                    Statistics: Num rows: 13 Data size: 208 Basic stats: COMPLETE Column stats: COMPLETE
                     Select Operator
                       expressions: _col1 (type: int)
                       outputColumnNames: _col0
-                      Statistics: Num rows: 4 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE
+                      Statistics: Num rows: 13 Data size: 52 Basic stats: COMPLETE Column stats: COMPLETE
                       Reduce Output Operator
                         key expressions: _col0 (type: int)
                         sort order: +
                         Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 4 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE
+                        Statistics: Num rows: 13 Data size: 52 Basic stats: COMPLETE Column stats: COMPLETE
         Reducer 6 
             Execution mode: llap
             Reduce Operator Tree:
diff --git a/ql/src/test/results/clientpositive/llap/vector_groupby_grouping_window.q.out b/ql/src/test/results/clientpositive/llap/vector_groupby_grouping_window.q.out
index 3a9ea79..5e391bf 100644
--- a/ql/src/test/results/clientpositive/llap/vector_groupby_grouping_window.q.out
+++ b/ql/src/test/results/clientpositive/llap/vector_groupby_grouping_window.q.out
@@ -151,7 +151,7 @@ STAGE PLANS:
                       native: true
                       predicateExpression: FilterLongColGreaterLongScalar(col 2:int, val 0)
                   predicate: (_col3 > 0) (type: boolean)
-                  Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
+                  Statistics: Num rows: 3 Data size: 60 Basic stats: COMPLETE Column stats: COMPLETE
                   Reduce Output Operator
                     key expressions: _col0 (type: int), _col3 (type: int)
                     sort order: ++
@@ -163,7 +163,7 @@ STAGE PLANS:
                         nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true
                         partitionColumns: 0:int
                         valueColumns: 1:int
-                    Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
+                    Statistics: Num rows: 3 Data size: 60 Basic stats: COMPLETE Column stats: COMPLETE
                     value expressions: _col2 (type: int)
         Reducer 3 
             Execution mode: vectorized, llap
@@ -188,7 +188,7 @@ STAGE PLANS:
                     className: VectorSelectOperator
                     native: true
                     projectedOutputColumnNums: [0, 2, 1]
-                Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
+                Statistics: Num rows: 3 Data size: 60 Basic stats: COMPLETE Column stats: COMPLETE
                 PTF Operator
                   Function definitions:
                       Input definition
@@ -222,7 +222,7 @@ STAGE PLANS:
                       outputTypes: [int, int, int, int]
                       partitionExpressions: [col 0:int]
                       streamingColumns: [3]
-                  Statistics: Num rows: 1 Data size: 20 Basic stats: COMPLETE Column stats: COMPLETE
+                  Statistics: Num rows: 3 Data size: 60 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: _col0 (type: int), _col2 (type: int), _col3 (type: int), rank_window_0 (type: int)
                     outputColumnNames: _col0, _col1, _col2, _col3
@@ -230,13 +230,13 @@ STAGE PLANS:
                         className: VectorSelectOperator
                         native: true
                         projectedOutputColumnNums: [0, 2, 1, 3]
-                    Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE
+                    Statistics: Num rows: 3 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE
                     File Output Operator
                       compressed: false
                       File Sink Vectorization:
                           className: VectorFileSinkOperator
                           native: false
-                      Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE
+                      Statistics: Num rows: 3 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE
                       table:
                           input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                           output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
diff --git a/ql/src/test/results/clientpositive/perf/tez/constraints/query78.q.out b/ql/src/test/results/clientpositive/perf/tez/constraints/query78.q.out
index 888d335..792540f 100644
--- a/ql/src/test/results/clientpositive/perf/tez/constraints/query78.q.out
+++ b/ql/src/test/results/clientpositive/perf/tez/constraints/query78.q.out
@@ -158,18 +158,18 @@ Stage-0
       File Output Operator [FS_269]
         Limit [LIM_268] (rows=100 width=484)
           Number of rows:100
-          Select Operator [SEL_267] (rows=203549242538 width=483)
+          Select Operator [SEL_267] (rows=1831943309558 width=483)
             Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"]
           <-Reducer 5 [SIMPLE_EDGE]
             SHUFFLE [RS_73]
-              Select Operator [SEL_72] (rows=203549242538 width=719)
+              Select Operator [SEL_72] (rows=1831943309558 width=719)
                 Output:["_col0","_col1","_col6","_col7","_col8","_col9","_col10","_col11","_col12"]
-                Merge Join Operator [MERGEJOIN_220] (rows=203549242538 width=703)
+                Merge Join Operator [MERGEJOIN_220] (rows=1831943309558 width=703)
                   Conds:RS_69._col0, _col1=RS_266._col0, _col1(Inner),Output:["_col0","_col1","_col2","_col3","_col4","_col6","_col7","_col8","_col11","_col12","_col13"]
                 <-Reducer 12 [ONE_TO_ONE_EDGE] vectorized
                   FORWARD [RS_266]
                     PartitionCols:_col0, _col1
-                    Filter Operator [FIL_265] (rows=13513323 width=239)
+                    Filter Operator [FIL_265] (rows=40539971 width=239)
                       predicate:(_col2 > 0L)
                       Group By Operator [GBY_264] (rows=40539971 width=239)
                         Output:["_col0","_col1","_col2","_col3","_col4"],aggregations:["sum(VALUE._col0)","sum(VALUE._col1)","sum(VALUE._col2)"],keys:KEY._col0, KEY._col1
@@ -228,7 +228,7 @@ Stage-0
                 <-Reducer 4 [SIMPLE_EDGE]
                   SHUFFLE [RS_69]
                     PartitionCols:_col0, _col1
-                    Merge Join Operator [MERGEJOIN_219] (rows=7613716536 width=471)
+                    Merge Join Operator [MERGEJOIN_219] (rows=22841150061 width=471)
                       Conds:RS_244._col1=RS_256._col0(Inner),Output:["_col0","_col1","_col2","_col3","_col4","_col6","_col7","_col8"]
                     <-Reducer 3 [SIMPLE_EDGE] vectorized
                       SHUFFLE [RS_244]
@@ -287,9 +287,9 @@ Stage-0
                     <-Reducer 9 [SIMPLE_EDGE] vectorized
                       SHUFFLE [RS_256]
                         PartitionCols:_col0
-                        Select Operator [SEL_255] (rows=33694814 width=235)
+                        Select Operator [SEL_255] (rows=101084444 width=235)
                           Output:["_col0","_col1","_col2","_col3"]
-                          Filter Operator [FIL_254] (rows=33694814 width=239)
+                          Filter Operator [FIL_254] (rows=101084444 width=239)
                             predicate:(_col2 > 0L)
                             Select Operator [SEL_253] (rows=101084444 width=239)
                               Output:["_col1","_col2","_col3","_col4"]
diff --git a/ql/src/test/results/clientpositive/perf/tez/query78.q.out b/ql/src/test/results/clientpositive/perf/tez/query78.q.out
index e66d6f5..9ce2cdb 100644
--- a/ql/src/test/results/clientpositive/perf/tez/query78.q.out
+++ b/ql/src/test/results/clientpositive/perf/tez/query78.q.out
@@ -158,20 +158,20 @@ Stage-0
       File Output Operator [FS_276]
         Limit [LIM_275] (rows=100 width=484)
           Number of rows:100
-          Select Operator [SEL_274] (rows=203549242531 width=483)
+          Select Operator [SEL_274] (rows=1831943309424 width=483)
             Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"]
           <-Reducer 5 [SIMPLE_EDGE]
             SHUFFLE [RS_76]
-              Select Operator [SEL_75] (rows=203549242531 width=719)
+              Select Operator [SEL_75] (rows=1831943309424 width=719)
                 Output:["_col0","_col1","_col2","_col6","_col7","_col8","_col9","_col10","_col11","_col12"]
-                Merge Join Operator [MERGEJOIN_223] (rows=203549242531 width=715)
+                Merge Join Operator [MERGEJOIN_223] (rows=1831943309424 width=715)
                   Conds:RS_72._col1=RS_273._col0(Inner),Output:["_col0","_col1","_col2","_col3","_col4","_col7","_col8","_col9","_col11","_col12","_col13","_col14","_col15"]
                 <-Reducer 12 [SIMPLE_EDGE] vectorized
                   SHUFFLE [RS_273]
                     PartitionCols:_col0
-                    Select Operator [SEL_272] (rows=33694814 width=247)
+                    Select Operator [SEL_272] (rows=101084444 width=247)
                       Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
-                      Filter Operator [FIL_271] (rows=33694814 width=239)
+                      Filter Operator [FIL_271] (rows=101084444 width=239)
                         predicate:(_col2 > 0L)
                         Select Operator [SEL_270] (rows=101084444 width=239)
                           Output:["_col1","_col2","_col3","_col4"]
@@ -234,7 +234,7 @@ Stage-0
                 <-Reducer 4 [SIMPLE_EDGE]
                   SHUFFLE [RS_72]
                     PartitionCols:_col1
-                    Merge Join Operator [MERGEJOIN_222] (rows=3053485049 width=471)
+                    Merge Join Operator [MERGEJOIN_222] (rows=9160455599 width=471)
                       Conds:RS_248._col1, _col0=RS_260._col1, _col0(Inner),Output:["_col0","_col1","_col2","_col3","_col4","_col7","_col8","_col9"]
                     <-Reducer 3 [ONE_TO_ONE_EDGE] vectorized
                       FORWARD [RS_248]
@@ -295,9 +295,9 @@ Stage-0
                     <-Reducer 9 [ONE_TO_ONE_EDGE] vectorized
                       FORWARD [RS_260]
                         PartitionCols:_col1, _col0
-                        Select Operator [SEL_259] (rows=13513323 width=239)
+                        Select Operator [SEL_259] (rows=40539971 width=239)
                           Output:["_col0","_col1","_col2","_col3","_col4"]
-                          Filter Operator [FIL_258] (rows=13513323 width=239)
+                          Filter Operator [FIL_258] (rows=40539971 width=239)
                             predicate:(_col2 > 0L)
                             Group By Operator [GBY_257] (rows=40539971 width=239)
                               Output:["_col0","_col1","_col2","_col3","_col4"],aggregations:["sum(VALUE._col0)","sum(VALUE._col1)","sum(VALUE._col2)"],keys:KEY._col0, KEY._col1