You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/07/08 03:57:23 UTC
[30/31] hive git commit: HIVE-11016 : MiniTez mergejoin test fails
with Tez input error (issue in merge join under certain conditions) (Sergey
Shelukhin, reviewed by Vikram Dixit K)
HIVE-11016 : MiniTez mergejoin test fails with Tez input error (issue in merge join under certain conditions) (Sergey Shelukhin, reviewed by Vikram Dixit K)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/10dc20fb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/10dc20fb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/10dc20fb
Branch: refs/heads/llap
Commit: 10dc20fb9091f30994c2b0093b2c75562411a7a1
Parents: 47b1803
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Jul 7 18:29:40 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Jul 7 18:29:40 2015 -0700
----------------------------------------------------------------------
.../hive/ql/exec/CommonMergeJoinOperator.java | 76 +++++++++-----------
1 file changed, 35 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/10dc20fb/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
index d1d5e2b..24af765 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
import org.apache.hadoop.hive.ql.exec.tez.RecordSource;
+import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource;
import org.apache.hadoop.hive.ql.exec.tez.TezContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
@@ -198,15 +199,8 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
List<Object> value = getFilteredValue(alias, row);
// compute keys and values as StandardObjects
List<Object> key = mergeJoinComputeKeys(row, alias);
- if (!firstFetchHappened) {
- firstFetchHappened = true;
- // fetch the first group for all small table aliases
- for (byte pos = 0; pos < order.length; pos++) {
- if (pos != posBigTable) {
- fetchNextGroup(pos);
- }
- }
- }
+ // Fetch the first group for all small table aliases.
+ doFirstFetchIfNeeded();
//have we reached a new key group?
boolean nextKeyGroup = processKey(alias, key);
@@ -392,6 +386,7 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
super.closeOp(abort);
// clean up
+ LOG.debug("Cleaning up the operator state");
for (int pos = 0; pos < order.length; pos++) {
if (pos != posBigTable) {
fetchDone[pos] = false;
@@ -402,7 +397,11 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
private void fetchOneRow(byte tag) throws HiveException {
try {
- fetchDone[tag] = !sources[tag].pushRecord();
+ boolean hasMore = sources[tag].pushRecord();
+ if (fetchDone[tag] && hasMore) {
+ LOG.warn("fetchDone[" + tag + "] was set to true (by a recursive call) and will be reset");
+ }// TODO: "else {"? This happened in the past due to a bug, see HIVE-11016.
+ fetchDone[tag] = !hasMore;
if (sources[tag].isGrouped()) {
// instead of maintaining complex state for the fetch of the next group,
// we know for sure that at the end of all the values for a given key,
@@ -429,31 +428,20 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
while (!allFetchDone) {
List<Byte> ret = joinOneGroup();
- for (int i = 0; i < fetchDone.length; i++) {
- // if the fetch is not completed for the big table
- if (i == posBigTable) {
- // if we are in close op phase, we have definitely exhausted the big table input
- fetchDone[i] = true;
- continue;
- }
-
- // in case of outer joins, we need to pull in records from the sides we still
- // need to produce output for apart from the big table. for e.g. full outer join
- if ((fetchInputAtClose.contains(i)) && (fetchDone[i] == false)) {
- // if we have never fetched, we need to fetch before we can do the join
- if (firstFetchHappened == false) {
- // we need to fetch all the needed ones at least once to ensure bootstrapping
- if (i == (fetchDone.length - 1)) {
- firstFetchHappened = true;
- }
- // This is a bootstrap. The joinOneGroup automatically fetches the next rows.
- fetchNextGroup((byte) i);
- }
- // Do the join. It does fetching of next row groups itself.
- if (i == (fetchDone.length - 1)) {
- ret = joinOneGroup();
- }
- }
+ // if we are in close op phase, we have definitely exhausted the big table input
+ fetchDone[posBigTable] = true;
+ // First, handle the condition where the first fetch was never done (big table is empty).
+ doFirstFetchIfNeeded();
+ // in case of outer joins, we need to pull in records from the sides we still
+ // need to produce output for apart from the big table. for e.g. full outer join
+ // TODO: this reproduces the logic of the loop that was here before, assuming
+ // firstFetchHappened == true. In reality it almost always calls joinOneGroup. Fix it?
+ int lastPos = (fetchDone.length - 1);
+ if (posBigTable != lastPos
+ && (fetchInputAtClose.contains(lastPos)) && (fetchDone[lastPos] == false)) {
+ // Do the join. It does fetching of next row groups itself.
+ LOG.debug("Calling joinOneGroup once again");
+ ret = joinOneGroup();
}
if (ret == null || ret.size() == 0) {
@@ -486,15 +474,21 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
}
}
- private boolean allFetchDone() {
- boolean allFetchDone = true;
+ private void doFirstFetchIfNeeded() throws HiveException {
+ if (firstFetchHappened) return;
+ firstFetchHappened = true;
for (byte pos = 0; pos < order.length; pos++) {
- if (pos == posBigTable) {
- continue;
+ if (pos != posBigTable) {
+ fetchNextGroup(pos);
}
- allFetchDone = allFetchDone && fetchDone[pos];
}
- return allFetchDone;
+ }
+
+ private boolean allFetchDone() {
+ for (byte pos = 0; pos < order.length; pos++) {
+ if (pos != posBigTable && !fetchDone[pos]) return false;
+ }
+ return true;
}
private void promoteNextGroupToCandidate(Byte t) throws HiveException {