You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2015/05/13 21:17:46 UTC

[4/4] hive git commit: HIVE-10542: Full outer joins in tez produce incorrect results in certain cases (Vikram Dixit K, reviewed by Gunther Hagleitner)

HIVE-10542: Full outer joins in tez produce incorrect results in certain cases (Vikram Dixit K, reviewed by Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/db56e8e3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/db56e8e3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/db56e8e3

Branch: refs/heads/branch-1.0
Commit: db56e8e3d0285068c789fdb3402d99aee41f553f
Parents: 5a9fddb
Author: vikram <vi...@hortonworks.com>
Authored: Wed May 13 12:17:18 2015 -0700
Committer: vikram <vi...@hortonworks.com>
Committed: Wed May 13 12:17:18 2015 -0700

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |    1 +
 .../hive/ql/exec/CommonMergeJoinOperator.java   |   55 +-
 ql/src/test/queries/clientpositive/mergejoin.q  |  124 +
 .../test/results/clientpositive/mergejoin.q.out | 3235 ++++++++++++++++++
 .../clientpositive/tez/auto_join29.q.out        |  500 +++
 .../results/clientpositive/tez/mergejoin.q.out  | 3184 +++++++++++++++++
 6 files changed, 7098 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/db56e8e3/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 7898d81..8f96826 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -107,6 +107,7 @@ minitez.query.files.shared=alter_merge_2_orc.q,\
   mapreduce2.q,\
   merge1.q,\
   merge2.q,\
+  mergejoin.q,\
   metadataonly1.q,\
   metadata_only_queries.q,\
   optimize_nullscan.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/db56e8e3/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
index 1da8933..fb97774 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
@@ -20,9 +20,11 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,6 +35,8 @@ import org.apache.hadoop.hive.ql.exec.tez.RecordSource;
 import org.apache.hadoop.hive.ql.exec.tez.TezContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
@@ -78,6 +82,7 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
   transient RecordSource[] sources;
   transient List<Operator<? extends OperatorDesc>> originalParents =
       new ArrayList<Operator<? extends OperatorDesc>>();
+  transient Set<Integer> fetchInputAtClose;
 
   public CommonMergeJoinOperator() {
     super();
@@ -88,6 +93,8 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
   public void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);
     firstFetchHappened = false;
+    fetchInputAtClose = getFetchInputAtCloseList();
+
     initializeChildren(hconf);
     int maxAlias = 0;
     for (byte pos = 0; pos < order.length; pos++) {
@@ -134,6 +141,25 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
     sources = ((TezContext) MapredContext.get()).getRecordSources();
   }
 
+  /*
+   * In case of outer joins, we need to push records through even if one of the sides is done
+   * sending records. For e.g. In the case of full outer join, the right side needs to send in data
+   * for the join even after the left side has completed sending all the records on its side. This
+   * can be done once at initialize time and at close, these tags will still forward records until
+   * they have no more to send. Also, subsequent joins need to fetch their data as well since
+   * any join following the outer join could produce results with one of the outer sides depending on
+   * the join condition. We could optimize for the case of inner joins in the future here.
+   */
+  private Set<Integer> getFetchInputAtCloseList() {
+    Set<Integer> retval = new HashSet<Integer>();
+    for (JoinCondDesc joinCondDesc : conf.getConds()) {
+      retval.add(joinCondDesc.getLeft());
+      retval.add(joinCondDesc.getRight());
+    }
+
+    return retval;
+  }
+
   @Override
   public void endGroup() throws HiveException {
     // we do not want the end group to cause a checkAndGenObject
@@ -162,7 +188,6 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
     List<Object> value = getFilteredValue(alias, row);
     // compute keys and values as StandardObjects
     List<Object> key = mergeJoinComputeKeys(row, alias);
-
     if (!firstFetchHappened) {
       firstFetchHappened = true;
       // fetch the first group for all small table aliases
@@ -359,9 +384,37 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
 
     while (!allFetchDone) {
       List<Byte> ret = joinOneGroup();
+      for (int i = 0; i < fetchDone.length; i++) {
+        // if the fetch is not completed for the big table
+        if (i == posBigTable) {
+          // if we are in close op phase, we have definitely exhausted the big table input
+          fetchDone[i] = true;
+          continue;
+        }
+
+        // in case of outer joins, we need to pull in records from the sides we still
+        // need to produce output for apart from the big table. for e.g. full outer join
+        if ((fetchInputAtClose.contains(i)) && (fetchDone[i] == false)) {
+          // if we have never fetched, we need to fetch before we can do the join
+          if (firstFetchHappened == false) {
+            // we need to fetch all the needed ones at least once to ensure bootstrapping
+            if (i == (fetchDone.length - 1)) {
+              firstFetchHappened = true;
+            }
+            // This is a bootstrap. The joinOneGroup automatically fetches the next rows.
+            fetchNextGroup((byte) i);
+          }
+          // Do the join. It does fetching of next row groups itself.
+          if (i == (fetchDone.length - 1)) {
+            ret = joinOneGroup();
+          }
+        }
+      }
+
       if (ret == null || ret.size() == 0) {
         break;
       }
+
       reportProgress();
       numMapRowsRead++;
       allFetchDone = allFetchDone();

http://git-wip-us.apache.org/repos/asf/hive/blob/db56e8e3/ql/src/test/queries/clientpositive/mergejoin.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mergejoin.q b/ql/src/test/queries/clientpositive/mergejoin.q
new file mode 100644
index 0000000..59374ca
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/mergejoin.q
@@ -0,0 +1,124 @@
+set hive.join.emit.interval=100000;
+set hive.optimize.ppd=true;
+set hive.ppd.remove.duplicatefilters=true;
+set hive.tez.dynamic.partition.pruning=true;
+set hive.optimize.metadataonly=false;
+set hive.optimize.index.filter=true;
+set hive.vectorized.execution.enabled=true;
+
+-- SORT_QUERY_RESULTS
+
+explain
+select * from src a join src1 b on a.key = b.key;
+
+select * from src a join src1 b on a.key = b.key;
+
+
+CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+CREATE TABLE tab_part (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS ORCFILE;
+CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+
+set hive.enforce.bucketing=true;
+set hive.enforce.sorting = true;
+set hive.optimize.bucketingsorting=false;
+insert overwrite table tab_part partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin_part;
+
+CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS ORCFILE;
+insert overwrite table tab partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin;
+
+explain
+select count(*)
+from tab a join tab_part b on a.key = b.key;
+
+select * from tab a join tab_part b on a.key = b.key;
+
+set hive.join.emit.interval=2;
+
+select * from tab a join tab_part b on a.key = b.key;
+
+explain
+select count(*)
+from tab a left outer join tab_part b on a.key = b.key;
+
+select count(*)
+from tab a left outer join tab_part b on a.key = b.key;
+
+explain
+select count (*)
+from tab a right outer join tab_part b on a.key = b.key;
+
+select count (*)
+from tab a right outer join tab_part b on a.key = b.key;
+
+explain
+select count(*)
+from tab a full outer join tab_part b on a.key = b.key;
+
+select count(*)
+from tab a full outer join tab_part b on a.key = b.key;
+
+explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value;
+select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value;
+
+explain select count(*) from tab a join tab_part b on a.value = b.value;
+select count(*) from tab a join tab_part b on a.value = b.value;
+
+explain
+select count(*) from (select s1.key as key, s1.value as value from tab s1 join tab s3 on s1.key=s3.key
+UNION  ALL
+select s2.key as key, s2.value as value from tab s2
+) a join tab_part b on (a.key = b.key);
+
+explain select count(*) from tab a join tab_part b on a.value = b.value;
+select count(*) from tab a join tab_part b on a.value = b.value;
+
+explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value;
+select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value;
+
+explain
+select count(*) from (select s1.key as key, s1.value as value from tab s1 join tab s3 on s1.key=s3.key
+UNION  ALL
+select s2.key as key, s2.value as value from tab s2
+) a join tab_part b on (a.key = b.key);
+
+explain
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id;
+
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id;
+
+set mapred.reduce.tasks=18;
+select * from (select * from tab where tab.key = 0)a full outer join (select * from tab_part where tab_part.key = 98)b on a.key = b.key;
+select * from (select * from tab where tab.key = 0)a right outer join (select * from tab_part where tab_part.key = 98)b on a.key = b.key;
+
+select * from
+(select * from tab where tab.key = 0)a
+full outer join
+(select * from tab_part where tab_part.key = 98)b join tab_part c on a.key = b.key and b.key = c.key;
+
+select * from
+(select * from tab where tab.key = 0)a
+join
+(select * from tab_part where tab_part.key = 98)b full outer join tab_part c on a.key = b.key and b.key = c.key;
+