You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/02/20 02:08:52 UTC

svn commit: r1570026 - /pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java

Author: daijy
Date: Thu Feb 20 01:08:52 2014
New Revision: 1570026

URL: http://svn.apache.org/r1570026
Log:
PIG-3761: Outer join fail on tez

Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1570026&r1=1570025&r2=1570026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Thu Feb 20 01:08:52 2014
@@ -106,6 +106,7 @@ public class POShuffleTezLoad extends PO
             boolean hasData = false;
             Object cur = null;
             PigNullableWritable min = null;
+            int minIndex = -1;
 
             try {
                 for (int i = 0; i < numInputs; i++) {
@@ -114,6 +115,7 @@ public class POShuffleTezLoad extends PO
                         cur = readers.get(i).getCurrentKey();
                         if (min == null || comparator.compare(min, cur) > 0) {
                             min = ((PigNullableWritable)cur).clone();
+                            minIndex = i;
                         }
                     }
                 }
@@ -139,7 +141,8 @@ public class POShuffleTezLoad extends PO
                     if (!finished[i]) {
                         cur = readers.get(i).getCurrentKey();
                         // We need to loop in case of Grouping Comparators
-                        while (comparator.compare(min, cur) == 0) {
+                        while (comparator.compare(min, cur) == 0 && (!min.isNull() ||
+                                min.isNull() && i==minIndex)) {
                             Iterable<Object> vals = readers.get(i).getCurrentValues();
                             if (isAccumulative()) {
                                 // TODO: POPackageTupleBuffer expects the