You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/11/05 23:53:27 UTC
svn commit: r1636999 - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
test/results/clientpositive/tez/vector_decimal_3.q.out
Author: gunther
Date: Wed Nov 5 22:53:27 2014
New Revision: 1636999
URL: http://svn.apache.org/r1636999
Log:
HIVE-8724: Right outer join produces incorrect result on Tez (Gunther Hagleitner, reviewed by Harish Butani and Ashutosh Chauhan)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
hive/trunk/ql/src/test/results/clientpositive/tez/vector_decimal_3.q.out
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java?rev=1636999&r1=1636998&r2=1636999&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java Wed Nov 5 22:53:27 2014
@@ -134,6 +134,19 @@ public class CommonMergeJoinOperator ext
sources = ((TezContext) MapredContext.get()).getRecordSources();
}
+ @Override
+ public void endGroup() throws HiveException {
+ // we do not want the end group to cause a checkAndGenObject
+ defaultEndGroup();
+ }
+
+ @Override
+ public void startGroup() throws HiveException {
+ // we do not want the start group to clear the storage
+ defaultStartGroup();
+ }
+
+
/*
* (non-Javadoc)
*
@@ -275,7 +288,7 @@ public class CommonMergeJoinOperator ext
if (foundNextKeyGroup[t]) {
// first promote the next group to be the current group if we reached a
// new group in the previous fetch
- if ((this.nextKeyWritables[t] != null) || (this.fetchDone[t] == false)) {
+ if (this.nextKeyWritables[t] != null) {
promoteNextGroupToCandidate(t);
} else {
this.keyWritables[t] = null;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java?rev=1636999&r1=1636998&r2=1636999&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java Wed Nov 5 22:53:27 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.t
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -73,10 +74,10 @@ public class ReduceRecordSource implemen
// for different tags
private SerDe inputValueDeserializer;
- TableDesc keyTableDesc;
- TableDesc valueTableDesc;
+ private TableDesc keyTableDesc;
+ private TableDesc valueTableDesc;
- ObjectInspector rowObjectInspector;
+ private ObjectInspector rowObjectInspector;
private Operator<?> reducer;
private Object keyObject = null;
@@ -84,8 +85,6 @@ public class ReduceRecordSource implemen
private boolean vectorized = false;
- List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
-
private DataOutputBuffer keyBuffer;
private DataOutputBuffer valueBuffer;
private VectorizedRowBatchCtx batchContext;
@@ -111,7 +110,7 @@ public class ReduceRecordSource implemen
private Iterable<Object> valueWritables;
- private final boolean grouped = true;
+ private final GroupIterator groupIterator = new GroupIterator();
void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc,
TableDesc valueTableDesc, KeyValuesReader reader, boolean handleGroupKey, byte tag,
@@ -207,13 +206,19 @@ public class ReduceRecordSource implemen
@Override
public final boolean isGrouped() {
- return grouped;
+ return vectorized;
}
@Override
public boolean pushRecord() throws HiveException {
BytesWritable keyWritable;
+ if (!vectorized && groupIterator.hasNext()) {
+ // if we have records left in the group we push one of those
+ groupIterator.next();
+ return true;
+ }
+
try {
if (!reader.next()) {
return false;
@@ -245,11 +250,13 @@ public class ReduceRecordSource implemen
reducer.setGroupKeyObject(keyObject);
}
- /* this.keyObject passed via reference */
if(vectorized) {
processVectors(valueWritables, tag);
} else {
- processKeyValues(valueWritables, tag);
+ groupIterator.initialize(valueWritables, keyObject, tag);
+ if (groupIterator.hasNext()) {
+ groupIterator.next(); // push first record of group
+ }
}
return true;
} catch (Throwable e) {
@@ -279,16 +286,29 @@ public class ReduceRecordSource implemen
}
}
- /**
- * @param values
- * @return true if it is not done and can take more inputs
- */
- private void processKeyValues(Iterable<Object> values, byte tag) throws HiveException {
- List<Object> passDownKey = null;
- for (Object value : values) {
- BytesWritable valueWritable = (BytesWritable) value;
+ private class GroupIterator {
+ private final List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
+ private List<Object> passDownKey = null;
+ private Iterator<Object> values;
+ private byte tag;
+ private Object keyObject;
+
+ public void initialize(Iterable<Object> values, Object keyObject, byte tag) {
+ this.passDownKey = null;
+ this.values = values.iterator();
+ this.tag = tag;
+ this.keyObject = keyObject;
+ }
+ public boolean hasNext() {
+ return values != null && values.hasNext();
+ }
+
+ public void next() throws HiveException {
row.clear();
+ Object value = values.next();
+ BytesWritable valueWritable = (BytesWritable) value;
+
if (passDownKey == null) {
row.add(this.keyObject);
} else {
@@ -387,7 +407,6 @@ public class ReduceRecordSource implemen
} catch (Exception e) {
if (!abort) {
// signal new failure to map-reduce
- l4j.error("Hit error while closing operators - failing tree");
throw new RuntimeException("Hive Runtime Error while closing operators: "
+ e.getMessage(), e);
}
Modified: hive/trunk/ql/src/test/results/clientpositive/tez/vector_decimal_3.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/tez/vector_decimal_3.q.out?rev=1636999&r1=1636998&r2=1636999&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/tez/vector_decimal_3.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/tez/vector_decimal_3.q.out Wed Nov 5 22:53:27 2014
@@ -295,8 +295,14 @@ POSTHOOK: Input: default@decimal_3
-0.33 0 -0.33 0
-0.3 0 -0.3 0
0.000000000000000000 0 0.000000000000000000 0
+0.000000000000000000 0 0 0
+0.000000000000000000 0 0 0
+0 0 0 0
0 0 0.000000000000000000 0
+0 0 0 0
0 0 0.000000000000000000 0
+0 0 0 0
+0 0 0 0
0.01 0 0.01 0
0.02 0 0.02 0
0.1 0 0.1 0
@@ -305,8 +311,14 @@ POSTHOOK: Input: default@decimal_3
0.33 0 0.33 0
0.333 0 0.333 0
1 1 1 1
+1 1 1.0 1
+1 1 1.000000000000000000 1
+1.0 1 1.000000000000000000 1
+1.0 1 1.0 1
1.0 1 1 1
+1.000000000000000000 1 1.000000000000000000 1
1.000000000000000000 1 1 1
+1.000000000000000000 1 1.0 1
1.12 1 1.12 1
1.122 1 1.122 1
2 2 2 2
@@ -322,9 +334,13 @@ POSTHOOK: Input: default@decimal_3
3.14 3 3.14 3
3.14 3 3.14 3
3.14 3 3.14 3
+3.14 3 3.140 4
+3.14 3 3.140 4
+3.14 3 3.140 4
3.140 4 3.14 3
3.140 4 3.14 3
3.140 4 3.14 3
+3.140 4 3.140 4
10 10 10 10
20 20 20 20
100 100 100 100