You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/05/10 08:51:45 UTC

hive git commit: HIVE-16599: NPE in runtime filtering cost when handling SMB Joins (Deepak Jaiswal, reviewed by Jason Dere)

Repository: hive
Updated Branches:
  refs/heads/master 57ae3aca0 -> 3db226857


HIVE-16599: NPE in runtime filtering cost when handling SMB Joins (Deepak Jaiswal, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3db22685
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3db22685
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3db22685

Branch: refs/heads/master
Commit: 3db22685748d689dfa2929644c82c0801a73726c
Parents: 57ae3ac
Author: Jason Dere <jd...@hortonworks.com>
Authored: Wed May 10 01:51:21 2017 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Wed May 10 01:51:21 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/parse/TezCompiler.java       |   6 +-
 .../hadoop/hive/ql/plan/ExprNodeDescUtils.java  |   8 +-
 .../dynamic_semijoin_reduction_2.q              |  48 +++
 .../llap/dynamic_semijoin_reduction_2.q.out     | 305 +++++++++++++++++++
 4 files changed, 363 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3db22685/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index 078704e..f469cd2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -671,7 +671,6 @@ public class TezCompiler extends TaskCompiler {
           continue;
         }
 
-        assert parent instanceof SelectOperator;
         while (parent != null) {
           if (parent instanceof TableScanOperator) {
             tsOps.add((TableScanOperator) parent);
@@ -685,10 +684,11 @@ public class TezCompiler extends TaskCompiler {
     // a semijoin filter on any of them, if so, remove it.
 
     ParseContext pctx = procCtx.parseContext;
+    Set<ReduceSinkOperator> rsSet = new HashSet<>(pctx.getRsToSemiJoinBranchInfo().keySet());
     for (TableScanOperator ts : tsOps) {
-      for (ReduceSinkOperator rs : pctx.getRsToSemiJoinBranchInfo().keySet()) {
+      for (ReduceSinkOperator rs : rsSet) {
         SemiJoinBranchInfo sjInfo = pctx.getRsToSemiJoinBranchInfo().get(rs);
-        if (ts == sjInfo.getTsOp()) {
+        if (sjInfo != null && ts == sjInfo.getTsOp()) {
           // match!
           if (LOG.isDebugEnabled()) {
             LOG.debug("Semijoin optimization found going to SMB join. Removing semijoin "

http://git-wip-us.apache.org/repos/asf/hive/blob/3db22685/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
index bfc1eca..01fab9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.optimizer.ConstantPropagateProcFactory;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
@@ -843,7 +844,12 @@ public class ExprNodeDescUtils {
       ExprNodeColumnDesc parentCol = ExprNodeDescUtils.getColumnExpr(parentExpr);
       if (parentCol != null) {
         for (Operator<?> currParent : op.getParentOperators()) {
-          if (currParent.getSchema().getTableNames().contains(parentCol.getTabAlias())) {
+          RowSchema schema = currParent.getSchema();
+          if (schema == null) {
+            // Happens in case of TezDummyStoreOperator
+            return null;
+          }
+          if (schema.getTableNames().contains(parentCol.getTabAlias())) {
             parentOp = currParent;
             break;
           }

http://git-wip-us.apache.org/repos/asf/hive/blob/3db22685/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_2.q b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_2.q
index 55f6e8a..97b3d84 100644
--- a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_2.q
+++ b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_2.q
@@ -31,6 +31,7 @@ EXPLAIN
 SELECT
 COUNT(*)
 FROM table_1 t1
+
 INNER JOIN table_18 t2 ON (((t2.tinyint_col_15) = (t1.bigint_col_7)) AND
 ((t2.decimal2709_col_9) = (t1.decimal2016_col_26))) AND
 ((t2.tinyint_col_20) = (t1.tinyint_col_3))
@@ -42,3 +43,50 @@ WHERE (t1.timestamp_col_9) = (tt2.timestamp_col_18));
 
 drop table table_1;
 drop table table_18;
+
+-- Hive 15699
+CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+
+CREATE TABLE src2 as select * from src1;
+insert into src2 select * from src2;
+insert into src2 select * from src2;
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+
+set hive.strict.checks.bucketing=false;
+set hive.join.emit.interval=2;
+set hive.stats.fetch.column.stats=true;
+set hive.optimize.bucketingsorting=false;
+set hive.stats.autogather=true;
+
+CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+insert overwrite table tab partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin;
+
+set hive.convert.join.bucket.mapjoin.tez = true;
+set hive.auto.convert.sortmerge.join = true;
+
+set hive.auto.convert.join.noconditionaltask.size=0;
+set hive.mapjoin.hybridgrace.minwbsize=125;
+set hive.mapjoin.hybridgrace.minnumpartitions=4;
+
+set hive.llap.memory.oversubscription.max.executors.per.query=3;
+
+CREATE TABLE tab2 (key int, value string, ds string);
+
+set hive.exec.dynamic.partition.mode=nonstrict
+insert into tab2select key, value, ds from tab;
+analyze table tab2 compute statistics;
+analyze table tab2 compute statistics for columns;
+
+
+explain
+select
+  count(*)
+  from
+  (select x.key as key, min(x.value) as value from tab2 x group by x.key) a
+  join
+  (select x.key as key, min(x.value) as value from tab2 x group by x.key) b
+  on
+  a.key = b.key join src1 c on a.value = c.value where c.key < 0;

http://git-wip-us.apache.org/repos/asf/hive/blob/3db22685/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_2.q.out b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_2.q.out
index cb69251..650dc9f 100644
--- a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_2.q.out
+++ b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_2.q.out
@@ -40,6 +40,7 @@ PREHOOK: query: EXPLAIN
 SELECT
 COUNT(*)
 FROM table_1 t1
+
 INNER JOIN table_18 t2 ON (((t2.tinyint_col_15) = (t1.bigint_col_7)) AND
 ((t2.decimal2709_col_9) = (t1.decimal2016_col_26))) AND
 ((t2.tinyint_col_20) = (t1.tinyint_col_3))
@@ -53,6 +54,7 @@ POSTHOOK: query: EXPLAIN
 SELECT
 COUNT(*)
 FROM table_1 t1
+
 INNER JOIN table_18 t2 ON (((t2.tinyint_col_15) = (t1.bigint_col_7)) AND
 ((t2.decimal2709_col_9) = (t1.decimal2016_col_26))) AND
 ((t2.tinyint_col_20) = (t1.tinyint_col_3))
@@ -299,3 +301,306 @@ POSTHOOK: query: drop table table_18
 POSTHOOK: type: DROPTABLE
 POSTHOOK: Input: default@table_18
 POSTHOOK: Output: default@table_18
+PREHOOK: query: CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcbucket_mapjoin
+POSTHOOK: query: CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcbucket_mapjoin
+PREHOOK: query: CREATE TABLE src2 as select * from src1
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@src1
+PREHOOK: Output: database:default
+PREHOOK: Output: default@src2
+POSTHOOK: query: CREATE TABLE src2 as select * from src1
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@src1
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@src2
+POSTHOOK: Lineage: src2.key SIMPLE [(src1)src1.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: src2.value SIMPLE [(src1)src1.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: insert into src2 select * from src2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src2
+PREHOOK: Output: default@src2
+POSTHOOK: query: insert into src2 select * from src2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src2
+POSTHOOK: Output: default@src2
+POSTHOOK: Lineage: src2.key SIMPLE [(src2)src2.FieldSchema(name:key, type:string, comment:null), ]
+POSTHOOK: Lineage: src2.value SIMPLE [(src2)src2.FieldSchema(name:value, type:string, comment:null), ]
+PREHOOK: query: insert into src2 select * from src2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src2
+PREHOOK: Output: default@src2
+POSTHOOK: query: insert into src2 select * from src2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src2
+POSTHOOK: Output: default@src2
+POSTHOOK: Lineage: src2.key SIMPLE [(src2)src2.FieldSchema(name:key, type:string, comment:null), ]
+POSTHOOK: Lineage: src2.value SIMPLE [(src2)src2.FieldSchema(name:value, type:string, comment:null), ]
+PREHOOK: query: load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08')
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@srcbucket_mapjoin
+POSTHOOK: query: load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08')
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@srcbucket_mapjoin
+POSTHOOK: Output: default@srcbucket_mapjoin@ds=2008-04-08
+PREHOOK: query: load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08')
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@srcbucket_mapjoin@ds=2008-04-08
+POSTHOOK: query: load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08')
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@srcbucket_mapjoin@ds=2008-04-08
+PREHOOK: query: CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tab
+POSTHOOK: query: CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tab
+PREHOOK: query: insert overwrite table tab partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcbucket_mapjoin
+PREHOOK: Input: default@srcbucket_mapjoin@ds=2008-04-08
+PREHOOK: Output: default@tab@ds=2008-04-08
+POSTHOOK: query: insert overwrite table tab partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcbucket_mapjoin
+POSTHOOK: Input: default@srcbucket_mapjoin@ds=2008-04-08
+POSTHOOK: Output: default@tab@ds=2008-04-08
+POSTHOOK: Lineage: tab PARTITION(ds=2008-04-08).key SIMPLE [(srcbucket_mapjoin)srcbucket_mapjoin.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: tab PARTITION(ds=2008-04-08).value SIMPLE [(srcbucket_mapjoin)srcbucket_mapjoin.FieldSchema(name:value, type:string, comment:null), ]
+PREHOOK: query: CREATE TABLE tab2 (key int, value string, ds string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tab2
+POSTHOOK: query: CREATE TABLE tab2 (key int, value string, ds string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tab2
+Warning: Value had a \n character in it.
+PREHOOK: query: analyze table tab2 compute statistics
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab2
+PREHOOK: Output: default@tab2
+POSTHOOK: query: analyze table tab2 compute statistics
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab2
+POSTHOOK: Output: default@tab2
+PREHOOK: query: analyze table tab2 compute statistics for columns
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab2
+PREHOOK: Output: default@tab2
+#### A masked pattern was here ####
+POSTHOOK: query: analyze table tab2 compute statistics for columns
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab2
+POSTHOOK: Output: default@tab2
+#### A masked pattern was here ####
+PREHOOK: query: explain
+select
+  count(*)
+  from
+  (select x.key as key, min(x.value) as value from tab2 x group by x.key) a
+  join
+  (select x.key as key, min(x.value) as value from tab2 x group by x.key) b
+  on
+  a.key = b.key join src1 c on a.value = c.value where c.key < 0
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select
+  count(*)
+  from
+  (select x.key as key, min(x.value) as value from tab2 x group by x.key) a
+  join
+  (select x.key as key, min(x.value) as value from tab2 x group by x.key) b
+  on
+  a.key = b.key join src1 c on a.value = c.value where c.key < 0
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Map 8 <- Reducer 5 (BROADCAST_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
+        Reducer 3 <- Map 8 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
+        Reducer 5 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: x
+                  filterExpr: key is not null (type: boolean)
+                  Statistics: Num rows: 1 Data size: 88 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 1 Data size: 88 Basic stats: COMPLETE Column stats: COMPLETE
+                    Group By Operator
+                      aggregations: min(value)
+                      keys: key (type: int)
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 1 Data size: 188 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 1 Data size: 188 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col1 (type: string)
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 6 
+            Map Operator Tree:
+                TableScan
+                  alias: x
+                  filterExpr: key is not null (type: boolean)
+                  Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE
+                    Group By Operator
+                      keys: key (type: int)
+                      mode: hash
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 8 
+            Map Operator Tree:
+                TableScan
+                  alias: c
+                  filterExpr: ((UDFToDouble(key) < 0.0) and value is not null and (value BETWEEN DynamicValue(RS_21_x__col1_min) AND DynamicValue(RS_21_x__col1_max) and in_bloom_filter(value, DynamicValue(RS_21_x__col1_bloom_filter)))) (type: boolean)
+                  Statistics: Num rows: 25 Data size: 4375 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: ((UDFToDouble(key) < 0.0) and value is not null and (value BETWEEN DynamicValue(RS_21_x__col1_min) AND DynamicValue(RS_21_x__col1_max) and in_bloom_filter(value, DynamicValue(RS_21_x__col1_bloom_filter)))) (type: boolean)
+                    Statistics: Num rows: 8 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: value (type: string)
+                      outputColumnNames: _col1
+                      Statistics: Num rows: 8 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col1 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col1 (type: string)
+                        Statistics: Num rows: 8 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Reducer 2 
+            Reduce Operator Tree:
+              Group By Operator
+                keys: KEY._col0 (type: int)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: min(VALUE._col0)
+                keys: KEY._col0 (type: int)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 1 Data size: 188 Basic stats: COMPLETE Column stats: COMPLETE
+                Filter Operator
+                  predicate: _col1 is not null (type: boolean)
+                  Statistics: Num rows: 1 Data size: 188 Basic stats: COMPLETE Column stats: COMPLETE
+                  Merge Join Operator
+                    condition map:
+                         Inner Join 0 to 1
+                    keys:
+                      0 _col0 (type: int)
+                      1 _col0 (type: int)
+                    outputColumnNames: _col1
+                    Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE
+                    Reduce Output Operator
+                      key expressions: _col1 (type: string)
+                      sort order: +
+                      Map-reduce partition columns: _col1 (type: string)
+                      Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: _col1 (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE
+                      Group By Operator
+                        aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=2)
+                        mode: hash
+                        outputColumnNames: _col0, _col1, _col2
+                        Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE
+                        Reduce Output Operator
+                          sort order: 
+                          Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE
+                          value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
+        Reducer 3 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col1 (type: string)
+                  1 _col1 (type: string)
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+                Group By Operator
+                  aggregations: count()
+                  mode: hash
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+                  Reduce Output Operator
+                    sort order: 
+                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+                    value expressions: _col0 (type: bigint)
+        Reducer 4 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+        Reducer 5 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=2)
+                mode: final
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE
+                Reduce Output Operator
+                  sort order: 
+                  Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE
+                  value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary)
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+