You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/07/08 03:36:59 UTC

hive git commit: HIVE-11016 : MiniTez mergejoin test fails with Tez input error (issue in merge join under certain conditions) (Sergey Shelukhin, reviewed by Vikram Dixit K)

Repository: hive
Updated Branches:
  refs/heads/master 47b18039d -> 10dc20fb9


HIVE-11016 : MiniTez mergejoin test fails with Tez input error (issue in merge join under certain conditions) (Sergey Shelukhin, reviewed by Vikram Dixit K)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/10dc20fb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/10dc20fb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/10dc20fb

Branch: refs/heads/master
Commit: 10dc20fb9091f30994c2b0093b2c75562411a7a1
Parents: 47b1803
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Jul 7 18:29:40 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Jul 7 18:29:40 2015 -0700

----------------------------------------------------------------------
 .../hive/ql/exec/CommonMergeJoinOperator.java   | 76 +++++++++-----------
 1 file changed, 35 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/10dc20fb/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
index d1d5e2b..24af765 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
 import org.apache.hadoop.hive.ql.exec.tez.RecordSource;
+import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource;
 import org.apache.hadoop.hive.ql.exec.tez.TezContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
@@ -198,15 +199,8 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
     List<Object> value = getFilteredValue(alias, row);
     // compute keys and values as StandardObjects
     List<Object> key = mergeJoinComputeKeys(row, alias);
-    if (!firstFetchHappened) {
-      firstFetchHappened = true;
-      // fetch the first group for all small table aliases
-      for (byte pos = 0; pos < order.length; pos++) {
-        if (pos != posBigTable) {
-          fetchNextGroup(pos);
-        }
-      }
-    }
+    // Fetch the first group for all small table aliases.
+    doFirstFetchIfNeeded();
 
     //have we reached a new key group?
     boolean nextKeyGroup = processKey(alias, key);
@@ -392,6 +386,7 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
     super.closeOp(abort);
 
     // clean up
+    LOG.debug("Cleaning up the operator state");
     for (int pos = 0; pos < order.length; pos++) {
       if (pos != posBigTable) {
         fetchDone[pos] = false;
@@ -402,7 +397,11 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
 
   private void fetchOneRow(byte tag) throws HiveException {
     try {
-      fetchDone[tag] = !sources[tag].pushRecord();
+      boolean hasMore = sources[tag].pushRecord();
+      if (fetchDone[tag] && hasMore) {
+        LOG.warn("fetchDone[" + tag + "] was set to true (by a recursive call) and will be reset");
+      }// TODO: "else {"? This happened in the past due to a bug, see HIVE-11016.
+        fetchDone[tag] = !hasMore;
       if (sources[tag].isGrouped()) {
         // instead of maintaining complex state for the fetch of the next group,
         // we know for sure that at the end of all the values for a given key,
@@ -429,31 +428,20 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
 
     while (!allFetchDone) {
       List<Byte> ret = joinOneGroup();
-      for (int i = 0; i < fetchDone.length; i++) {
-        // if the fetch is not completed for the big table
-        if (i == posBigTable) {
-          // if we are in close op phase, we have definitely exhausted the big table input
-          fetchDone[i] = true;
-          continue;
-        }
-
-        // in case of outer joins, we need to pull in records from the sides we still
-        // need to produce output for apart from the big table. for e.g. full outer join
-        if ((fetchInputAtClose.contains(i)) && (fetchDone[i] == false)) {
-          // if we have never fetched, we need to fetch before we can do the join
-          if (firstFetchHappened == false) {
-            // we need to fetch all the needed ones at least once to ensure bootstrapping
-            if (i == (fetchDone.length - 1)) {
-              firstFetchHappened = true;
-            }
-            // This is a bootstrap. The joinOneGroup automatically fetches the next rows.
-            fetchNextGroup((byte) i);
-          }
-          // Do the join. It does fetching of next row groups itself.
-          if (i == (fetchDone.length - 1)) {
-            ret = joinOneGroup();
-          }
-        }
+      // if we are in close op phase, we have definitely exhausted the big table input
+      fetchDone[posBigTable] = true;
+      // First, handle the condition where the first fetch was never done (big table is empty).
+      doFirstFetchIfNeeded();
+      // in case of outer joins, we need to pull in records from the sides we still
+      // need to produce output for apart from the big table. for e.g. full outer join
+      // TODO: this reproduces the logic of the loop that was here before, assuming
+      // firstFetchHappened == true. In reality it almost always calls joinOneGroup. Fix it?
+      int lastPos = (fetchDone.length - 1);
+      if (posBigTable != lastPos
+          && (fetchInputAtClose.contains(lastPos)) && (fetchDone[lastPos] == false)) {
+        // Do the join. It does fetching of next row groups itself.
+        LOG.debug("Calling joinOneGroup once again");
+        ret = joinOneGroup();
       }
 
       if (ret == null || ret.size() == 0) {
@@ -486,15 +474,21 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
     }
   }
 
-  private boolean allFetchDone() {
-    boolean allFetchDone = true;
+  private void doFirstFetchIfNeeded() throws HiveException {
+    if (firstFetchHappened) return;
+    firstFetchHappened = true;
     for (byte pos = 0; pos < order.length; pos++) {
-      if (pos == posBigTable) {
-        continue;
+      if (pos != posBigTable) {
+        fetchNextGroup(pos);
       }
-      allFetchDone = allFetchDone && fetchDone[pos];
     }
-    return allFetchDone;
+  }
+
+  private boolean allFetchDone() {
+    for (byte pos = 0; pos < order.length; pos++) {
+      if (pos != posBigTable && !fetchDone[pos]) return false;
+    }
+    return true;
   }
 
   private void promoteNextGroupToCandidate(Byte t) throws HiveException {