You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/11/20 22:46:52 UTC

[12/12] hive git commit: HIVE-12437: SMB join in tez fails when one of the tables is empty (Vikram Dixit K, reviewed by Siddharth Seth)

HIVE-12437: SMB join in tez fails when one of the tables is empty (Vikram Dixit K, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4a13e5fb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4a13e5fb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4a13e5fb

Branch: refs/heads/master-fixed
Commit: 4a13e5fb24837820cb6cdd1093bc8920c639b122
Parents: 39bd58b
Author: vikram <vi...@hortonworks.com>
Authored: Fri Nov 20 13:14:58 2015 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Fri Nov 20 13:46:23 2015 -0800

----------------------------------------------------------------------
 .../hive/ql/exec/tez/CustomPartitionVertex.java |   9 +-
 .../hive/ql/exec/tez/MapRecordProcessor.java    |  14 +-
 .../test/queries/clientpositive/tez_smb_empty.q |  13 ++
 .../clientpositive/tez/tez_smb_empty.q.out      | 168 +++++++++++++++++++
 4 files changed, 200 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4a13e5fb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
index fe1ef37..e9c14b1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
@@ -319,9 +319,9 @@ public class CustomPartitionVertex extends VertexManagerPlugin {
       Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
     // the bucket to task map should have been setup by the big table.
     LOG.info("Processing events for input " + inputName);
-    if (bucketToTaskMap.isEmpty()) {
-      LOG.info("We don't have a routing table yet. Will need to wait for the main input"
-          + " initialization");
+    if (inputNameInputSpecMap.get(mainWorkName) == null) {
+      LOG.info("We don't have a routing table yet. Will need to wait for the main input "
+          + mainWorkName + " initialization");
       inputToGroupedSplitMap.put(inputName, bucketToGroupedSplitMap);
       return;
     }
@@ -351,6 +351,9 @@ public class CustomPartitionVertex extends VertexManagerPlugin {
 
     for (Entry<Integer, Collection<ByteBuffer>> entry : bucketToSerializedSplitMap.asMap().entrySet()) {
       Collection<Integer> destTasks = bucketToTaskMap.get(entry.getKey());
+      if ((destTasks == null) || (destTasks.isEmpty())) {
+        continue;
+      }
       for (Integer task : destTasks) {
         int count = 0;
         for (ByteBuffer buf : entry.getValue()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/4a13e5fb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 1d645a0..914b4e7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -201,7 +201,19 @@ public class MapRecordProcessor extends RecordProcessor {
             jconf.set(Utilities.INPUT_NAME, mergeMapWork.getName());
             mergeMapOp.initialize(jconf, null);
             // if there are no files/partitions to read, we need to skip trying to read
-            boolean skipRead = mergeMapOp.getConf().getPathToAliases().isEmpty();
+            MultiMRInput multiMRInput = multiMRInputMap.get(mergeMapWork.getName());
+            boolean skipRead = false;
+            if (multiMRInput == null) {
+              l4j.info("Multi MR Input for work " + mergeMapWork.getName() + " is null. Skipping read.");
+              skipRead = true;
+            } else {
+              Collection<KeyValueReader> keyValueReaders = multiMRInput.getKeyValueReaders();
+              if ((keyValueReaders == null) || (keyValueReaders.isEmpty())) {
+                l4j.info("Key value readers are null or empty and hence skipping read. "
+                    + "KeyValueReaders = " + keyValueReaders);
+                skipRead = true;
+              }
+            }
             if (skipRead) {
               List<Operator<?>> children = new ArrayList<Operator<?>>();
               children.addAll(mergeMapOp.getConf().getAliasToWork().values());

http://git-wip-us.apache.org/repos/asf/hive/blob/4a13e5fb/ql/src/test/queries/clientpositive/tez_smb_empty.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_smb_empty.q b/ql/src/test/queries/clientpositive/tez_smb_empty.q
index 196cc97..2427377 100644
--- a/ql/src/test/queries/clientpositive/tez_smb_empty.q
+++ b/ql/src/test/queries/clientpositive/tez_smb_empty.q
@@ -53,3 +53,16 @@ explain
 select count(*) from tab s1 left outer join empty s2 on s1.key=s2.key join tab s3 on s1.key = s3.key;
 
 select count(*) from tab s1 left outer join empty s2 on s1.key=s2.key join tab s3 on s1.key = s3.key;
+
+explain
+select count(*) from empty s1 join empty s3 on s1.key=s3.key;
+
+select count(*) from empty s1 join empty s3 on s1.key=s3.key;
+
+set hive.auto.convert.sortmerge.join.bigtable.selection.policy = org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSelectorForAutoSMJ;
+
+explain
+select count(*) from empty s1 join tab s3 on s1.key=s3.key;
+
+select count(*) from empty s1 join tab s3 on s1.key=s3.key;
+

http://git-wip-us.apache.org/repos/asf/hive/blob/4a13e5fb/ql/src/test/results/clientpositive/tez/tez_smb_empty.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/tez_smb_empty.q.out b/ql/src/test/results/clientpositive/tez/tez_smb_empty.q.out
index 82ec31d..a32fb12 100644
--- a/ql/src/test/results/clientpositive/tez/tez_smb_empty.q.out
+++ b/ql/src/test/results/clientpositive/tez/tez_smb_empty.q.out
@@ -674,3 +674,171 @@ POSTHOOK: Input: default@tab
 POSTHOOK: Input: default@tab@ds=2008-04-08
 #### A masked pattern was here ####
 480
+PREHOOK: query: explain
+select count(*) from empty s1 join empty s3 on s1.key=s3.key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from empty s1 join empty s3 on s1.key=s3.key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: s1
+                  Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+                    Select Operator
+                      expressions: key (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+            Map Operator Tree:
+                TableScan
+                  alias: s1
+                  Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+                    Select Operator
+                      expressions: key (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+                      Merge Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
+                        Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+                        Group By Operator
+                          aggregations: count()
+                          mode: hash
+                          outputColumnNames: _col0
+                          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                          Reduce Output Operator
+                            sort order: 
+                            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                            value expressions: _col0 (type: bigint)
+        Reducer 2 
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*) from empty s1 join empty s3 on s1.key=s3.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@empty
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from empty s1 join empty s3 on s1.key=s3.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@empty
+#### A masked pattern was here ####
+0
+PREHOOK: query: explain
+select count(*) from empty s1 join tab s3 on s1.key=s3.key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from empty s1 join tab s3 on s1.key=s3.key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: s3
+                  Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
+            Map Operator Tree:
+                TableScan
+                  alias: s1
+                  Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+                    Merge Join Operator
+                      condition map:
+                           Inner Join 0 to 1
+                      keys:
+                        0 key (type: int)
+                        1 key (type: int)
+                      Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE
+                      Group By Operator
+                        aggregations: count()
+                        mode: hash
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                        Reduce Output Operator
+                          sort order: 
+                          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                          value expressions: _col0 (type: bigint)
+        Reducer 2 
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*) from empty s1 join tab s3 on s1.key=s3.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@empty
+PREHOOK: Input: default@tab
+PREHOOK: Input: default@tab@ds=2008-04-08
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from empty s1 join tab s3 on s1.key=s3.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@empty
+POSTHOOK: Input: default@tab
+POSTHOOK: Input: default@tab@ds=2008-04-08
+#### A masked pattern was here ####
+0