You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/11/20 22:46:52 UTC
[12/12] hive git commit: HIVE-12437: SMB join in tez fails when one
of the tables is empty (Vikram Dixit K, reviewed by Siddharth Seth)
HIVE-12437: SMB join in tez fails when one of the tables is empty (Vikram Dixit K, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4a13e5fb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4a13e5fb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4a13e5fb
Branch: refs/heads/master-fixed
Commit: 4a13e5fb24837820cb6cdd1093bc8920c639b122
Parents: 39bd58b
Author: vikram <vi...@hortonworks.com>
Authored: Fri Nov 20 13:14:58 2015 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Fri Nov 20 13:46:23 2015 -0800
----------------------------------------------------------------------
.../hive/ql/exec/tez/CustomPartitionVertex.java | 9 +-
.../hive/ql/exec/tez/MapRecordProcessor.java | 14 +-
.../test/queries/clientpositive/tez_smb_empty.q | 13 ++
.../clientpositive/tez/tez_smb_empty.q.out | 168 +++++++++++++++++++
4 files changed, 200 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/4a13e5fb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
index fe1ef37..e9c14b1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
@@ -319,9 +319,9 @@ public class CustomPartitionVertex extends VertexManagerPlugin {
Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
// the bucket to task map should have been setup by the big table.
LOG.info("Processing events for input " + inputName);
- if (bucketToTaskMap.isEmpty()) {
- LOG.info("We don't have a routing table yet. Will need to wait for the main input"
- + " initialization");
+ if (inputNameInputSpecMap.get(mainWorkName) == null) {
+ LOG.info("We don't have a routing table yet. Will need to wait for the main input "
+ + mainWorkName + " initialization");
inputToGroupedSplitMap.put(inputName, bucketToGroupedSplitMap);
return;
}
@@ -351,6 +351,9 @@ public class CustomPartitionVertex extends VertexManagerPlugin {
for (Entry<Integer, Collection<ByteBuffer>> entry : bucketToSerializedSplitMap.asMap().entrySet()) {
Collection<Integer> destTasks = bucketToTaskMap.get(entry.getKey());
+ if ((destTasks == null) || (destTasks.isEmpty())) {
+ continue;
+ }
for (Integer task : destTasks) {
int count = 0;
for (ByteBuffer buf : entry.getValue()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/4a13e5fb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 1d645a0..914b4e7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -201,7 +201,19 @@ public class MapRecordProcessor extends RecordProcessor {
jconf.set(Utilities.INPUT_NAME, mergeMapWork.getName());
mergeMapOp.initialize(jconf, null);
// if there are no files/partitions to read, we need to skip trying to read
- boolean skipRead = mergeMapOp.getConf().getPathToAliases().isEmpty();
+ MultiMRInput multiMRInput = multiMRInputMap.get(mergeMapWork.getName());
+ boolean skipRead = false;
+ if (multiMRInput == null) {
+ l4j.info("Multi MR Input for work " + mergeMapWork.getName() + " is null. Skipping read.");
+ skipRead = true;
+ } else {
+ Collection<KeyValueReader> keyValueReaders = multiMRInput.getKeyValueReaders();
+ if ((keyValueReaders == null) || (keyValueReaders.isEmpty())) {
+ l4j.info("Key value readers are null or empty and hence skipping read. "
+ + "KeyValueReaders = " + keyValueReaders);
+ skipRead = true;
+ }
+ }
if (skipRead) {
List<Operator<?>> children = new ArrayList<Operator<?>>();
children.addAll(mergeMapOp.getConf().getAliasToWork().values());
http://git-wip-us.apache.org/repos/asf/hive/blob/4a13e5fb/ql/src/test/queries/clientpositive/tez_smb_empty.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_smb_empty.q b/ql/src/test/queries/clientpositive/tez_smb_empty.q
index 196cc97..2427377 100644
--- a/ql/src/test/queries/clientpositive/tez_smb_empty.q
+++ b/ql/src/test/queries/clientpositive/tez_smb_empty.q
@@ -53,3 +53,16 @@ explain
select count(*) from tab s1 left outer join empty s2 on s1.key=s2.key join tab s3 on s1.key = s3.key;
select count(*) from tab s1 left outer join empty s2 on s1.key=s2.key join tab s3 on s1.key = s3.key;
+
+explain
+select count(*) from empty s1 join empty s3 on s1.key=s3.key;
+
+select count(*) from empty s1 join empty s3 on s1.key=s3.key;
+
+set hive.auto.convert.sortmerge.join.bigtable.selection.policy = org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSelectorForAutoSMJ;
+
+explain
+select count(*) from empty s1 join tab s3 on s1.key=s3.key;
+
+select count(*) from empty s1 join tab s3 on s1.key=s3.key;
+
http://git-wip-us.apache.org/repos/asf/hive/blob/4a13e5fb/ql/src/test/results/clientpositive/tez/tez_smb_empty.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/tez_smb_empty.q.out b/ql/src/test/results/clientpositive/tez/tez_smb_empty.q.out
index 82ec31d..a32fb12 100644
--- a/ql/src/test/results/clientpositive/tez/tez_smb_empty.q.out
+++ b/ql/src/test/results/clientpositive/tez/tez_smb_empty.q.out
@@ -674,3 +674,171 @@ POSTHOOK: Input: default@tab
POSTHOOK: Input: default@tab@ds=2008-04-08
#### A masked pattern was here ####
480
+PREHOOK: query: explain
+select count(*) from empty s1 join empty s3 on s1.key=s3.key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from empty s1 join empty s3 on s1.key=s3.key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: s1
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+ Select Operator
+ expressions: key (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+ Map Operator Tree:
+ TableScan
+ alias: s1
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+ Select Operator
+ expressions: key (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+ Merge Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: int)
+ 1 _col0 (type: int)
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 2
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select count(*) from empty s1 join empty s3 on s1.key=s3.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@empty
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from empty s1 join empty s3 on s1.key=s3.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@empty
+#### A masked pattern was here ####
+0
+PREHOOK: query: explain
+select count(*) from empty s1 join tab s3 on s1.key=s3.key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from empty s1 join tab s3 on s1.key=s3.key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: s3
+ Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
+ Map Operator Tree:
+ TableScan
+ alias: s1
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+ Merge Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 key (type: int)
+ 1 key (type: int)
+ Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 2
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select count(*) from empty s1 join tab s3 on s1.key=s3.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@empty
+PREHOOK: Input: default@tab
+PREHOOK: Input: default@tab@ds=2008-04-08
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from empty s1 join tab s3 on s1.key=s3.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@empty
+POSTHOOK: Input: default@tab
+POSTHOOK: Input: default@tab@ds=2008-04-08
+#### A masked pattern was here ####
+0