You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/11/05 23:53:27 UTC

svn commit: r1636999 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java test/results/clientpositive/tez/vector_decimal_3.q.out

Author: gunther
Date: Wed Nov  5 22:53:27 2014
New Revision: 1636999

URL: http://svn.apache.org/r1636999
Log:
HIVE-8724: Right outer join produces incorrect result on Tez (Gunther Hagleitner, reviewed by Harish Butani and Ashutosh Chauhan)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
    hive/trunk/ql/src/test/results/clientpositive/tez/vector_decimal_3.q.out

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java?rev=1636999&r1=1636998&r2=1636999&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java Wed Nov  5 22:53:27 2014
@@ -134,6 +134,19 @@ public class CommonMergeJoinOperator ext
     sources = ((TezContext) MapredContext.get()).getRecordSources();
   }
 
+  @Override
+  public void endGroup() throws HiveException {
+    // we do not want the end group to cause a checkAndGenObject
+    defaultEndGroup();
+  }
+
+  @Override
+  public void startGroup() throws HiveException {
+    // we do not want the start group to clear the storage
+    defaultStartGroup();
+  }
+
+
   /*
    * (non-Javadoc)
    *
@@ -275,7 +288,7 @@ public class CommonMergeJoinOperator ext
     if (foundNextKeyGroup[t]) {
       // first promote the next group to be the current group if we reached a
       // new group in the previous fetch
-      if ((this.nextKeyWritables[t] != null) || (this.fetchDone[t] == false)) {
+      if (this.nextKeyWritables[t] != null) {
         promoteNextGroupToCandidate(t);
       } else {
         this.keyWritables[t] = null;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java?rev=1636999&r1=1636998&r2=1636999&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java Wed Nov  5 22:53:27 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.t
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -73,10 +74,10 @@ public class ReduceRecordSource implemen
   // for different tags
   private SerDe inputValueDeserializer;
 
-  TableDesc keyTableDesc;
-  TableDesc valueTableDesc;
+  private TableDesc keyTableDesc;
+  private TableDesc valueTableDesc;
 
-  ObjectInspector rowObjectInspector;
+  private ObjectInspector rowObjectInspector;
   private Operator<?> reducer;
 
   private Object keyObject = null;
@@ -84,8 +85,6 @@ public class ReduceRecordSource implemen
 
   private boolean vectorized = false;
 
-  List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
-
   private DataOutputBuffer keyBuffer;
   private DataOutputBuffer valueBuffer;
   private VectorizedRowBatchCtx batchContext;
@@ -111,7 +110,7 @@ public class ReduceRecordSource implemen
 
   private Iterable<Object> valueWritables;
   
-  private final boolean grouped = true;
+  private final GroupIterator groupIterator = new GroupIterator();
 
   void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc,
       TableDesc valueTableDesc, KeyValuesReader reader, boolean handleGroupKey, byte tag,
@@ -207,13 +206,19 @@ public class ReduceRecordSource implemen
   
   @Override
   public final boolean isGrouped() {
-    return grouped;
+    return vectorized;
   }
 
   @Override
   public boolean pushRecord() throws HiveException {
     BytesWritable keyWritable;
 
+    if (!vectorized && groupIterator.hasNext()) {
+      // if we have records left in the group we push one of those
+      groupIterator.next();
+      return true;
+    }
+
     try {
       if (!reader.next()) {
         return false;
@@ -245,11 +250,13 @@ public class ReduceRecordSource implemen
         reducer.setGroupKeyObject(keyObject);
       }
 
-      /* this.keyObject passed via reference */
       if(vectorized) {
         processVectors(valueWritables, tag);
       } else {
-        processKeyValues(valueWritables, tag);
+        groupIterator.initialize(valueWritables, keyObject, tag);
+        if (groupIterator.hasNext()) {
+          groupIterator.next(); // push first record of group
+        }
       }
       return true;
     } catch (Throwable e) {
@@ -279,16 +286,29 @@ public class ReduceRecordSource implemen
     }
   }
 
-  /**
-   * @param values
-   * @return true if it is not done and can take more inputs
-   */
-  private void processKeyValues(Iterable<Object> values, byte tag) throws HiveException {
-    List<Object> passDownKey = null;
-    for (Object value : values) {
-      BytesWritable valueWritable = (BytesWritable) value;
+  private class GroupIterator {
+    private final List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
+    private List<Object> passDownKey = null;
+    private Iterator<Object> values;
+    private byte tag;
+    private Object keyObject;
+
+    public void initialize(Iterable<Object> values, Object keyObject, byte tag) {
+      this.passDownKey = null;
+      this.values = values.iterator();
+      this.tag = tag;
+      this.keyObject = keyObject;
+    }
 
+    public boolean hasNext() {
+      return values != null && values.hasNext();
+    }
+
+    public void next() throws HiveException {
       row.clear();
+      Object value = values.next();
+      BytesWritable valueWritable = (BytesWritable) value;
+
       if (passDownKey == null) {
         row.add(this.keyObject);
       } else {
@@ -387,7 +407,6 @@ public class ReduceRecordSource implemen
     } catch (Exception e) {
       if (!abort) {
         // signal new failure to map-reduce
-        l4j.error("Hit error while closing operators - failing tree");
         throw new RuntimeException("Hive Runtime Error while closing operators: "
             + e.getMessage(), e);
       }

Modified: hive/trunk/ql/src/test/results/clientpositive/tez/vector_decimal_3.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/tez/vector_decimal_3.q.out?rev=1636999&r1=1636998&r2=1636999&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/tez/vector_decimal_3.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/tez/vector_decimal_3.q.out Wed Nov  5 22:53:27 2014
@@ -295,8 +295,14 @@ POSTHOOK: Input: default@decimal_3
 -0.33	0	-0.33	0
 -0.3	0	-0.3	0
 0.000000000000000000	0	0.000000000000000000	0
+0.000000000000000000	0	0	0
+0.000000000000000000	0	0	0
+0	0	0	0
 0	0	0.000000000000000000	0
+0	0	0	0
 0	0	0.000000000000000000	0
+0	0	0	0
+0	0	0	0
 0.01	0	0.01	0
 0.02	0	0.02	0
 0.1	0	0.1	0
@@ -305,8 +311,14 @@ POSTHOOK: Input: default@decimal_3
 0.33	0	0.33	0
 0.333	0	0.333	0
 1	1	1	1
+1	1	1.0	1
+1	1	1.000000000000000000	1
+1.0	1	1.000000000000000000	1
+1.0	1	1.0	1
 1.0	1	1	1
+1.000000000000000000	1	1.000000000000000000	1
 1.000000000000000000	1	1	1
+1.000000000000000000	1	1.0	1
 1.12	1	1.12	1
 1.122	1	1.122	1
 2	2	2	2
@@ -322,9 +334,13 @@ POSTHOOK: Input: default@decimal_3
 3.14	3	3.14	3
 3.14	3	3.14	3
 3.14	3	3.14	3
+3.14	3	3.140	4
+3.14	3	3.140	4
+3.14	3	3.140	4
 3.140	4	3.14	3
 3.140	4	3.14	3
 3.140	4	3.14	3
+3.140	4	3.140	4
 10	10	10	10
 20	20	20	20
 100	100	100	100