You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2015/05/13 21:17:46 UTC
[4/4] hive git commit: HIVE-10542: Full outer joins in tez produce
incorrect results in certain cases (Vikram Dixit K,
reviewed by Gunther Hagleitner)
HIVE-10542: Full outer joins in tez produce incorrect results in certain cases (Vikram Dixit K, reviewed by Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/db56e8e3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/db56e8e3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/db56e8e3
Branch: refs/heads/branch-1.0
Commit: db56e8e3d0285068c789fdb3402d99aee41f553f
Parents: 5a9fddb
Author: vikram <vi...@hortonworks.com>
Authored: Wed May 13 12:17:18 2015 -0700
Committer: vikram <vi...@hortonworks.com>
Committed: Wed May 13 12:17:18 2015 -0700
----------------------------------------------------------------------
.../test/resources/testconfiguration.properties | 1 +
.../hive/ql/exec/CommonMergeJoinOperator.java | 55 +-
ql/src/test/queries/clientpositive/mergejoin.q | 124 +
.../test/results/clientpositive/mergejoin.q.out | 3235 ++++++++++++++++++
.../clientpositive/tez/auto_join29.q.out | 500 +++
.../results/clientpositive/tez/mergejoin.q.out | 3184 +++++++++++++++++
6 files changed, 7098 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/db56e8e3/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 7898d81..8f96826 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -107,6 +107,7 @@ minitez.query.files.shared=alter_merge_2_orc.q,\
mapreduce2.q,\
merge1.q,\
merge2.q,\
+ mergejoin.q,\
metadataonly1.q,\
metadata_only_queries.q,\
optimize_nullscan.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/db56e8e3/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
index 1da8933..fb97774 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
@@ -20,9 +20,11 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,6 +35,8 @@ import org.apache.hadoop.hive.ql.exec.tez.RecordSource;
import org.apache.hadoop.hive.ql.exec.tez.TezContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
@@ -78,6 +82,7 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
transient RecordSource[] sources;
transient List<Operator<? extends OperatorDesc>> originalParents =
new ArrayList<Operator<? extends OperatorDesc>>();
+ transient Set<Integer> fetchInputAtClose;
public CommonMergeJoinOperator() {
super();
@@ -88,6 +93,8 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
public void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
firstFetchHappened = false;
+ fetchInputAtClose = getFetchInputAtCloseList();
+
initializeChildren(hconf);
int maxAlias = 0;
for (byte pos = 0; pos < order.length; pos++) {
@@ -134,6 +141,25 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
sources = ((TezContext) MapredContext.get()).getRecordSources();
}
+ /*
+ * In case of outer joins, we need to push records through even if one of the sides is done
+ * sending records. For e.g. In the case of full outer join, the right side needs to send in data
+ * for the join even after the left side has completed sending all the records on its side. This
+ * can be done once at initialize time and at close, these tags will still forward records until
+ * they have no more to send. Also, subsequent joins need to fetch their data as well since
+ * any join following the outer join could produce results with one of the outer sides depending on
+ * the join condition. We could optimize for the case of inner joins in the future here.
+ */
+ private Set<Integer> getFetchInputAtCloseList() {
+ Set<Integer> retval = new HashSet<Integer>();
+ for (JoinCondDesc joinCondDesc : conf.getConds()) {
+ retval.add(joinCondDesc.getLeft());
+ retval.add(joinCondDesc.getRight());
+ }
+
+ return retval;
+ }
+
@Override
public void endGroup() throws HiveException {
// we do not want the end group to cause a checkAndGenObject
@@ -162,7 +188,6 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
List<Object> value = getFilteredValue(alias, row);
// compute keys and values as StandardObjects
List<Object> key = mergeJoinComputeKeys(row, alias);
-
if (!firstFetchHappened) {
firstFetchHappened = true;
// fetch the first group for all small table aliases
@@ -359,9 +384,37 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
while (!allFetchDone) {
List<Byte> ret = joinOneGroup();
+ for (int i = 0; i < fetchDone.length; i++) {
+ // if the fetch is not completed for the big table
+ if (i == posBigTable) {
+ // if we are in close op phase, we have definitely exhausted the big table input
+ fetchDone[i] = true;
+ continue;
+ }
+
+ // in case of outer joins, we need to pull in records from the sides we still
+ // need to produce output for apart from the big table. for e.g. full outer join
+ if ((fetchInputAtClose.contains(i)) && (fetchDone[i] == false)) {
+ // if we have never fetched, we need to fetch before we can do the join
+ if (firstFetchHappened == false) {
+ // we need to fetch all the needed ones at least once to ensure bootstrapping
+ if (i == (fetchDone.length - 1)) {
+ firstFetchHappened = true;
+ }
+ // This is a bootstrap. The joinOneGroup automatically fetches the next rows.
+ fetchNextGroup((byte) i);
+ }
+ // Do the join. It does fetching of next row groups itself.
+ if (i == (fetchDone.length - 1)) {
+ ret = joinOneGroup();
+ }
+ }
+ }
+
if (ret == null || ret.size() == 0) {
break;
}
+
reportProgress();
numMapRowsRead++;
allFetchDone = allFetchDone();
http://git-wip-us.apache.org/repos/asf/hive/blob/db56e8e3/ql/src/test/queries/clientpositive/mergejoin.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mergejoin.q b/ql/src/test/queries/clientpositive/mergejoin.q
new file mode 100644
index 0000000..59374ca
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/mergejoin.q
@@ -0,0 +1,124 @@
+set hive.join.emit.interval=100000;
+set hive.optimize.ppd=true;
+set hive.ppd.remove.duplicatefilters=true;
+set hive.tez.dynamic.partition.pruning=true;
+set hive.optimize.metadataonly=false;
+set hive.optimize.index.filter=true;
+set hive.vectorized.execution.enabled=true;
+
+-- SORT_QUERY_RESULTS
+
+explain
+select * from src a join src1 b on a.key = b.key;
+
+select * from src a join src1 b on a.key = b.key;
+
+
+CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+CREATE TABLE tab_part (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS ORCFILE;
+CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+
+set hive.enforce.bucketing=true;
+set hive.enforce.sorting = true;
+set hive.optimize.bucketingsorting=false;
+insert overwrite table tab_part partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin_part;
+
+CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS ORCFILE;
+insert overwrite table tab partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin;
+
+explain
+select count(*)
+from tab a join tab_part b on a.key = b.key;
+
+select * from tab a join tab_part b on a.key = b.key;
+
+set hive.join.emit.interval=2;
+
+select * from tab a join tab_part b on a.key = b.key;
+
+explain
+select count(*)
+from tab a left outer join tab_part b on a.key = b.key;
+
+select count(*)
+from tab a left outer join tab_part b on a.key = b.key;
+
+explain
+select count (*)
+from tab a right outer join tab_part b on a.key = b.key;
+
+select count (*)
+from tab a right outer join tab_part b on a.key = b.key;
+
+explain
+select count(*)
+from tab a full outer join tab_part b on a.key = b.key;
+
+select count(*)
+from tab a full outer join tab_part b on a.key = b.key;
+
+explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value;
+select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value;
+
+explain select count(*) from tab a join tab_part b on a.value = b.value;
+select count(*) from tab a join tab_part b on a.value = b.value;
+
+explain
+select count(*) from (select s1.key as key, s1.value as value from tab s1 join tab s3 on s1.key=s3.key
+UNION ALL
+select s2.key as key, s2.value as value from tab s2
+) a join tab_part b on (a.key = b.key);
+
+explain select count(*) from tab a join tab_part b on a.value = b.value;
+select count(*) from tab a join tab_part b on a.value = b.value;
+
+explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value;
+select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value;
+
+explain
+select count(*) from (select s1.key as key, s1.value as value from tab s1 join tab s3 on s1.key=s3.key
+UNION ALL
+select s2.key as key, s2.value as value from tab s2
+) a join tab_part b on (a.key = b.key);
+
+explain
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id;
+
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id;
+
+set mapred.reduce.tasks=18;
+select * from (select * from tab where tab.key = 0)a full outer join (select * from tab_part where tab_part.key = 98)b on a.key = b.key;
+select * from (select * from tab where tab.key = 0)a right outer join (select * from tab_part where tab_part.key = 98)b on a.key = b.key;
+
+select * from
+(select * from tab where tab.key = 0)a
+full outer join
+(select * from tab_part where tab_part.key = 98)b join tab_part c on a.key = b.key and b.key = c.key;
+
+select * from
+(select * from tab where tab.key = 0)a
+join
+(select * from tab_part where tab_part.key = 98)b full outer join tab_part c on a.key = b.key and b.key = c.key;
+