You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/08/12 01:01:11 UTC

svn commit: r1695396 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/ test/e2e/pig/tests/

Author: rohini
Date: Tue Aug 11 23:01:10 2015
New Revision: 1695396

URL: http://svn.apache.org/r1695396
Log:
PIG-4627: [Pig on Tez] Self join does not handle null values correctly (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
    pig/trunk/test/e2e/pig/tests/multiquery.conf

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Aug 11 23:01:10 2015
@@ -38,6 +38,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4627: [Pig on Tez] Self join does not handle null values correctly (rohini)
+
 PIG-4644: PORelationToExprProject.clone() is broken (erwaman via rohini)
 
 PIG-4650: ant mvn-deploy target is broken (daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java Tue Aug 11 23:01:10 2015
@@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.backend.hadoop.BigDecimalWritable;
 import org.apache.pig.impl.io.NullableBigDecimalWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -74,8 +73,10 @@ public class PigBigDecimalRawComparator
         if (b1[s1] == 0 && b2[s2] == 0) {
             rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
@@ -93,8 +94,10 @@ public class PigBigDecimalRawComparator
         if (!ndw1.isNull() && !ndw2.isNull()) {
             rc = ((BigDecimal)ndw1.getValueAsPigType()).compareTo((BigDecimal)ndw2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (ndw1.isNull() && ndw2.isNull()) rc = 0;
+            // Two nulls are equal if indices are same
+            if (ndw1.isNull() && ndw2.isNull()) {
+                rc = ndw1.getIndex() - ndw2.getIndex();
+            }
             else if (ndw1.isNull()) rc = -1;
             else rc = 1;
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java Tue Aug 11 23:01:10 2015
@@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.backend.hadoop.BigIntegerWritable;
 import org.apache.pig.impl.io.NullableBigIntegerWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -74,8 +73,10 @@ public class PigBigIntegerRawComparator
         if (b1[s1] == 0 && b2[s2] == 0) {
             rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
@@ -93,8 +94,10 @@ public class PigBigIntegerRawComparator
         if (!ndw1.isNull() && !ndw2.isNull()) {
             rc = ((BigInteger)ndw1.getValueAsPigType()).compareTo((BigInteger)ndw2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (ndw1.isNull() && ndw2.isNull()) rc = 0;
+            // Two nulls are equal if indices are same
+            if (ndw1.isNull() && ndw2.isNull()) {
+                rc = ndw1.getIndex() - ndw2.getIndex();
+            }
             else if (ndw1.isNull()) rc = -1;
             else rc = 1;
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java Tue Aug 11 23:01:10 2015
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configurab
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.impl.io.NullableBooleanWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
 
@@ -39,6 +38,7 @@ public class PigBooleanRawComparator ext
         super(NullableBooleanWritable.class);
         mWrappedComp = new BooleanWritable.Comparator();
     }
+    @Override
     public void setConf(Configuration conf) {
         try {
             mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -54,6 +54,7 @@ public class PigBooleanRawComparator ext
         }
     }
 
+    @Override
     public Configuration getConf() {
         return null;
     }
@@ -63,6 +64,7 @@ public class PigBooleanRawComparator ext
      * then BooleanWritable.compare() is used.  If both are null then the indices
      * are compared.  Otherwise the null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
 
@@ -72,8 +74,10 @@ public class PigBooleanRawComparator ext
             byte byte2 = b2[s2 + 1];
             rc = (byte1 < byte2) ? -1 : ((byte1 > byte2) ? 1 : 0);
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
@@ -81,6 +85,7 @@ public class PigBooleanRawComparator ext
         return rc;
     }
 
+    @Override
     public int compare(Object o1, Object o2) {
         NullableBooleanWritable nbw1 = (NullableBooleanWritable)o1;
         NullableBooleanWritable nbw2 = (NullableBooleanWritable)o2;
@@ -90,8 +95,10 @@ public class PigBooleanRawComparator ext
         if (!nbw1.isNull() && !nbw2.isNull()) {
             rc = ((Boolean)nbw1.getValueAsPigType()).compareTo((Boolean)nbw2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (nbw1.isNull() && nbw2.isNull()) rc = 0;
+            // Two nulls are equal if indices are same
+            if (nbw1.isNull() && nbw2.isNull()) {
+                rc = nbw1.getIndex() - nbw2.getIndex();
+            }
             else if (nbw1.isNull()) rc = -1;
             else rc = 1;
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java Tue Aug 11 23:01:10 2015
@@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.data.BinInterSedes;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.io.NullableBytesWritable;
@@ -34,13 +33,14 @@ public class PigBytesRawComparator exten
 
     private final Log mLog = LogFactory.getLog(getClass());
     private boolean[] mAsc;
-    private WritableComparator mWrappedComp;
+    private BinInterSedes.BinInterSedesTupleRawComparator mWrappedComp;
 
     public PigBytesRawComparator() {
         super(NullableBytesWritable.class);
         mWrappedComp = new BinInterSedes.BinInterSedesTupleRawComparator();
     }
 
+    @Override
     public void setConf(Configuration conf) {
         try {
             mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -57,6 +57,7 @@ public class PigBytesRawComparator exten
         ((BinInterSedes.BinInterSedesTupleRawComparator)mWrappedComp).setConf(conf);
     }
 
+    @Override
     public Configuration getConf() {
         return null;
     }
@@ -69,6 +70,7 @@ public class PigBytesRawComparator exten
      *    For non-bytearrays, we use BinInterSedesTupleRawComparator.
      * If either is null, null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
 
@@ -125,11 +127,17 @@ public class PigBytesRawComparator exten
               // Subtract 2, one for null byte and one for index byte. Also, do not reverse the sign
               // of rc when mAsc[0] is false because BinInterSedesTupleRawComparator.compare() already
               // takes that into account.
-              return mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
+              rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
+              // handle PIG-927. If tuples are equal but any field inside tuple is null,
+              // then we do not merge keys if indices are not same
+              if (rc == 0 && mWrappedComp.hasComparedTupleNull())
+                  rc = b1[s1 + 1] - b2[s2 + 1];
             }
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
@@ -137,6 +145,7 @@ public class PigBytesRawComparator exten
         return rc;
     }
 
+    @Override
     public int compare(Object o1, Object o2) {
         NullableBytesWritable nbw1 = (NullableBytesWritable)o1;
         NullableBytesWritable nbw2 = (NullableBytesWritable)o2;
@@ -146,8 +155,10 @@ public class PigBytesRawComparator exten
         if (!nbw1.isNull() && !nbw2.isNull()) {
             rc = DataType.compare(nbw1.getValueAsPigType(), nbw2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (nbw1.isNull() && nbw2.isNull()) rc = 0;
+            // Two nulls are equal if indices are same
+            if (nbw1.isNull() && nbw2.isNull()) {
+                rc = nbw1.getIndex() - nbw2.getIndex();
+            }
             else if (nbw1.isNull()) rc = -1;
             else rc = 1;
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java Tue Aug 11 23:01:10 2015
@@ -20,17 +20,15 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 
-import org.joda.time.DateTime;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.backend.hadoop.DateTimeWritable;
 import org.apache.pig.impl.io.NullableDateTimeWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.joda.time.DateTime;
 
 public class PigDateTimeRawComparator extends WritableComparator implements
         Configurable {
@@ -44,6 +42,7 @@ public class PigDateTimeRawComparator ex
         mWrappedComp = new DateTimeWritable.Comparator();
     }
 
+    @Override
     public void setConf(Configuration conf) {
         try {
             mAsc = (boolean[]) ObjectSerializer.deserialize(conf
@@ -59,6 +58,7 @@ public class PigDateTimeRawComparator ex
         }
     }
 
+    @Override
     public Configuration getConf() {
         return null;
     }
@@ -68,6 +68,7 @@ public class PigDateTimeRawComparator ex
      * IntWritable.compare() is used. If both are null then the indices are
      * compared. Otherwise the null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
 
@@ -75,9 +76,10 @@ public class PigDateTimeRawComparator ex
         if (b1[s1] == 0 && b2[s2] == 0) {
             rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0)
-                rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0)
                 rc = -1;
             else
@@ -88,6 +90,7 @@ public class PigDateTimeRawComparator ex
         return rc;
     }
 
+    @Override
     public int compare(Object o1, Object o2) {
         NullableDateTimeWritable ndtw1 = (NullableDateTimeWritable) o1;
         NullableDateTimeWritable ndtw2 = (NullableDateTimeWritable) o2;
@@ -98,9 +101,10 @@ public class PigDateTimeRawComparator ex
             rc = ((DateTime) ndtw1.getValueAsPigType())
                     .compareTo((DateTime) ndtw2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (ndtw1.isNull() && ndtw2.isNull())
-                rc = 0;
+            // Two nulls are equal if indices are same
+            if (ndtw1.isNull() && ndtw2.isNull()) {
+                rc = ndtw1.getIndex() - ndtw2.getIndex();
+            }
             else if (ndtw1.isNull())
                 rc = -1;
             else

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java Tue Aug 11 23:01:10 2015
@@ -21,12 +21,9 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
 import org.apache.pig.backend.hadoop.DoubleWritable;
 import org.apache.pig.impl.io.NullableDoubleWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -42,6 +39,7 @@ public class PigDoubleRawComparator exte
         mWrappedComp = new DoubleWritable.Comparator();
     }
 
+    @Override
     public void setConf(Configuration conf) {
         try {
             mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -57,6 +55,7 @@ public class PigDoubleRawComparator exte
         }
     }
 
+    @Override
     public Configuration getConf() {
         return null;
     }
@@ -66,6 +65,7 @@ public class PigDoubleRawComparator exte
      * then IntWritable.compare() is used.  If both are null then the indices
      * are compared.  Otherwise the null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
 
@@ -73,8 +73,10 @@ public class PigDoubleRawComparator exte
         if (b1[s1] == 0 && b2[s2] == 0) {
             rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
@@ -82,6 +84,7 @@ public class PigDoubleRawComparator exte
         return rc;
     }
 
+    @Override
     public int compare(Object o1, Object o2) {
         NullableDoubleWritable ndw1 = (NullableDoubleWritable)o1;
         NullableDoubleWritable ndw2 = (NullableDoubleWritable)o2;
@@ -91,8 +94,10 @@ public class PigDoubleRawComparator exte
         if (!ndw1.isNull() && !ndw2.isNull()) {
             rc = ((Double)ndw1.getValueAsPigType()).compareTo((Double)ndw2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (ndw1.isNull() && ndw2.isNull()) rc = 0;
+            // Two nulls are equal if indices are same
+            if (ndw1.isNull() && ndw2.isNull()) {
+                rc = ndw1.getIndex() - ndw2.getIndex();
+            }
             else if (ndw1.isNull()) rc = -1;
             else rc = 1;
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java Tue Aug 11 23:01:10 2015
@@ -21,13 +21,10 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
 import org.apache.pig.impl.io.NullableFloatWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
 
@@ -42,6 +39,7 @@ public class PigFloatRawComparator exten
         mWrappedComp = new FloatWritable.Comparator();
     }
 
+    @Override
     public void setConf(Configuration conf) {
         try {
             mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -57,6 +55,7 @@ public class PigFloatRawComparator exten
         }
     }
 
+    @Override
     public Configuration getConf() {
         return null;
     }
@@ -66,6 +65,7 @@ public class PigFloatRawComparator exten
      * then IntWritable.compare() is used.  If both are null then the indices
      * are compared.  Otherwise the null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
 
@@ -73,8 +73,10 @@ public class PigFloatRawComparator exten
         if (b1[s1] == 0 && b2[s2] == 0) {
             rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
@@ -82,6 +84,7 @@ public class PigFloatRawComparator exten
         return rc;
     }
 
+    @Override
     public int compare(Object o1, Object o2) {
         NullableFloatWritable nfw1 = (NullableFloatWritable)o1;
         NullableFloatWritable nfw2 = (NullableFloatWritable)o2;
@@ -91,8 +94,10 @@ public class PigFloatRawComparator exten
         if (!nfw1.isNull() && !nfw2.isNull()) {
             rc = ((Float)nfw1.getValueAsPigType()).compareTo((Float)nfw2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (nfw1.isNull() && nfw2.isNull()) rc = 0;
+            // Two nulls are equal if indices are same
+            if (nfw1.isNull() && nfw2.isNull()) {
+                rc = nfw1.getIndex() - nfw2.getIndex();
+            }
             else if (nfw1.isNull()) rc = -1;
             else rc = 1;
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java Tue Aug 11 23:01:10 2015
@@ -21,12 +21,9 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
 import org.apache.pig.impl.io.NullableIntWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
 
@@ -39,6 +36,7 @@ public class PigIntRawComparator extends
         super(NullableIntWritable.class);
     }
 
+    @Override
     public void setConf(Configuration conf) {
         try {
             mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -54,6 +52,7 @@ public class PigIntRawComparator extends
         }
     }
 
+    @Override
     public Configuration getConf() {
         return null;
     }
@@ -63,6 +62,7 @@ public class PigIntRawComparator extends
      * then IntWritable.compare() is used.  If both are null then the indices
      * are compared.  Otherwise the null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
 
@@ -72,8 +72,10 @@ public class PigIntRawComparator extends
             int int2 = readInt(b2, s2 + 1);
             rc = (int1 < int2) ? -1 : ((int1 > int2) ? 1 : 0);
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
@@ -81,6 +83,7 @@ public class PigIntRawComparator extends
         return rc;
     }
 
+    @Override
     public int compare(Object o1, Object o2) {
         NullableIntWritable niw1 = (NullableIntWritable)o1;
         NullableIntWritable niw2 = (NullableIntWritable)o2;
@@ -90,8 +93,10 @@ public class PigIntRawComparator extends
         if (!niw1.isNull() && !niw2.isNull()) {
             rc = ((Integer)niw1.getValueAsPigType()).compareTo((Integer)niw2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (niw1.isNull() && niw2.isNull()) rc = 0;
+            // Two nulls are equal if indices are same
+            if (niw1.isNull() && niw2.isNull()) {
+                rc = niw1.getIndex() - niw2.getIndex();
+            }
             else if (niw1.isNull()) rc = -1;
             else rc = 1;
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java Tue Aug 11 23:01:10 2015
@@ -21,22 +21,18 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
-import org.apache.pig.impl.io.NullableLongWritable;
 import org.apache.pig.impl.io.NullableLongWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
 
 public class PigLongRawComparator extends WritableComparator implements Configurable {
 
-    private final Log mLog = LogFactory.getLog(getClass());
-    private boolean[] mAsc;
-    private LongWritable.Comparator mWrappedComp;
+    protected final Log mLog = LogFactory.getLog(getClass());
+    protected boolean[] mAsc;
+    protected LongWritable.Comparator mWrappedComp;
 
     public PigLongRawComparator() {
         super(NullableLongWritable.class);
@@ -44,6 +40,7 @@ public class PigLongRawComparator extend
     }
 
 
+    @Override
     public void setConf(Configuration conf) {
         try {
             mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -59,6 +56,7 @@ public class PigLongRawComparator extend
         }
     }
 
+    @Override
     public Configuration getConf() {
         return null;
     }
@@ -68,6 +66,7 @@ public class PigLongRawComparator extend
      * then IntWritable.compare() is used.  If both are null then the indices
      * are compared.  Otherwise the null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
 
@@ -75,8 +74,10 @@ public class PigLongRawComparator extend
         if (b1[s1] == 0 && b2[s2] == 0) {
             rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
@@ -84,6 +85,7 @@ public class PigLongRawComparator extend
         return rc;
     }
 
+    @Override
     public int compare(Object o1, Object o2) {
         NullableLongWritable nlw1 = (NullableLongWritable)o1;
         NullableLongWritable nlw2 = (NullableLongWritable)o2;
@@ -93,8 +95,10 @@ public class PigLongRawComparator extend
         if (!nlw1.isNull() && !nlw2.isNull()) {
             rc = ((Long)nlw1.getValueAsPigType()).compareTo((Long)nlw2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (nlw1.isNull() && nlw2.isNull()) rc = 0;
+            // Two nulls are equal if indices are same
+            if (nlw1.isNull() && nlw2.isNull()) {
+                rc = nlw1.getIndex() - nlw2.getIndex();
+            }
             else if (nlw1.isNull()) rc = -1;
             else rc = 1;
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java Tue Aug 11 23:01:10 2015
@@ -21,13 +21,10 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
 import org.apache.pig.impl.io.NullableText;
 import org.apache.pig.impl.util.ObjectSerializer;
 
@@ -43,6 +40,7 @@ public class PigTextRawComparator extend
     }
 
 
+    @Override
     public void setConf(Configuration conf) {
         try {
             mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -57,6 +55,7 @@ public class PigTextRawComparator extend
         }
     }
 
+    @Override
     public Configuration getConf() {
         return null;
     }
@@ -66,6 +65,7 @@ public class PigTextRawComparator extend
      * then IntWritable.compare() is used.  If both are null then the indices
      * are compared.  Otherwise the null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
 
@@ -73,8 +73,10 @@ public class PigTextRawComparator extend
         if (b1[s1] == 0 && b2[s2] == 0) {
             rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
@@ -82,6 +84,7 @@ public class PigTextRawComparator extend
         return rc;
     }
 
+    @Override
     public int compare(Object o1, Object o2) {
         NullableText nt1 = (NullableText)o1;
         NullableText nt2 = (NullableText)o2;
@@ -91,8 +94,10 @@ public class PigTextRawComparator extend
         if (!nt1.isNull() && !nt2.isNull()) {
             rc = ((String)nt1.getValueAsPigType()).compareTo((String)nt2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (nt1.isNull() && nt2.isNull()) rc = 0;
+            // Two nulls are equal if indices are same
+            if (nt1.isNull() && nt2.isNull()) {
+                rc = nt1.getIndex() - nt2.getIndex();
+            }
             else if (nt1.isNull()) rc = -1;
             else rc = 1;
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java Tue Aug 11 23:01:10 2015
@@ -25,7 +25,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.BinInterSedes;
 import org.apache.pig.data.DataType;
@@ -46,6 +45,7 @@ public class PigTupleDefaultRawComparato
         super(TupleFactory.getInstance().tupleClass());
     }
 
+    @Override
     public void setConf(Configuration conf) {
         try {
             mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
@@ -62,6 +62,7 @@ public class PigTupleDefaultRawComparato
         mWholeTuple = (mAsc.length == 1);
     }
 
+    @Override
     public Configuration getConf() {
         return null;
     }
@@ -78,9 +79,9 @@ public class PigTupleDefaultRawComparato
      * IntWritable.compare() is used. If both are null then the indices are
      * compared. Otherwise the null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
-        mHasNullField = false;
 
         Tuple t1;
         Tuple t2;
@@ -99,9 +100,16 @@ public class PigTupleDefaultRawComparato
 
         rc = compareTuple(t1, t2); //TODO think about how SchemaTuple could speed this up
 
+        // handle PIG-927. If tuples are equal but any field inside tuple is null,
+        // then we do not merge keys if indices are not same
+        if (rc == 0 && mHasNullField) {
+            rc = ((NullableTuple) t1).getIndex() - ((NullableTuple) t2).getIndex();
+        }
+
         return rc;
     }
 
+    @Override
     public int compare(Object o1, Object o2) {
         NullableTuple nt1 = (NullableTuple) o1;
         NullableTuple nt2 = (NullableTuple) o2;
@@ -110,10 +118,16 @@ public class PigTupleDefaultRawComparato
         // If either are null, handle differently.
         if (!nt1.isNull() && !nt2.isNull()) {
             rc = compareTuple((Tuple) nt1.getValueAsPigType(), (Tuple) nt2.getValueAsPigType());
+            // handle PIG-927. If tuples are equal but any field inside tuple is null,
+            // then we do not merge keys if indices are not same
+            if (rc == 0 && mHasNullField) {
+                rc = nt1.getIndex() - nt2.getIndex();
+            }
         } else {
-            // For sorting purposes two nulls are equal.
-            if (nt1.isNull() && nt2.isNull())
-                rc = 0;
+            // Two nulls are equal if indices are same
+            if (nt1.isNull() && nt2.isNull()) {
+                rc = nt1.getIndex() - nt2.getIndex();
+            }
             else if (nt1.isNull())
                 rc = -1;
             else
@@ -125,6 +139,7 @@ public class PigTupleDefaultRawComparato
     }
 
     private int compareTuple(Tuple t1, Tuple t2) {
+        mHasNullField = false;
         int sz1 = t1.size();
         int sz2 = t2.size();
         if (sz2 < sz1) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java Tue Aug 11 23:01:10 2015
@@ -23,9 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -90,18 +88,26 @@ public class PigTupleSortComparator exte
     }
 
     /**
-     * Compare two NullableTuples as raw bytes. Tuples are compared field-wise. If both are null they are defined equal.
+     * Compare two NullableTuples as raw bytes. Tuples are compared field-wise.
+     * If both are null, then the indices are compared.
      * Otherwise the null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
         if (b1[s1] == 0 && b2[s2] == 0) {
             // skip mNull and mIndex
             rc = mComparator.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
+            // handle PIG-927. If tuples are equal but any field inside tuple is null,
+            // then we do not merge keys if indices are not same
+            if (rc == 0 && mComparator.hasComparedTupleNull()) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
         } else {
-            // for sorting purposes two nulls are equal, null sorts first
-            if (b1[s1] != 0 && b2[s2] != 0)
-                rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0)
                 rc = -1;
             else
@@ -112,6 +118,7 @@ public class PigTupleSortComparator exte
         return rc;
     }
 
+    @Override
     @SuppressWarnings("unchecked")
     public int compare(Object o1, Object o2) {
         NullableTuple nt1 = (NullableTuple) o1;
@@ -121,10 +128,16 @@ public class PigTupleSortComparator exte
         // If either are null, handle differently.
         if (!nt1.isNull() && !nt2.isNull()) {
             rc = mComparator.compare((Tuple) nt1.getValueAsPigType(), (Tuple) nt2.getValueAsPigType());
+            // handle PIG-927. If tuples are equal but any field inside tuple is null,
+            // then we do not merge keys if indices are not same
+            if (rc == 0 && mComparator.hasComparedTupleNull()) {
+                rc = nt1.getIndex() - nt2.getIndex();
+            }
         } else {
-            // For sorting purposes two nulls are equal.
-            if (nt1.isNull() && nt2.isNull())
-                rc = 0;
+            // Two nulls are equal if indices are same
+            if (nt1.isNull() && nt2.isNull()) {
+                rc = nt1.getIndex() - nt2.getIndex();
+            }
             else if (nt1.isNull())
                 rc = -1;
             else

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue Aug 11 23:01:10 2015
@@ -58,18 +58,7 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBigDecimalWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBigIntegerWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBooleanWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingCharArrayWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingDBAWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingDateTimeWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingDoubleWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingFloatWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingIntWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingLongWritableComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingPartitionWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingTupleWritableComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
@@ -739,7 +728,7 @@ public class TezDagBuilder extends TezOp
             parallel = -1;
         }
         Resource resource;
-        if (globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB)!=null && 
+        if (globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB)!=null &&
                 globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES)!=null) {
             resource = Resource.newInstance(globalConf.getInt(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB,
                     TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT),
@@ -992,7 +981,7 @@ public class TezDagBuilder extends TezOp
         selectOutputComparator(keyType, conf, tezOp);
     }
 
-    private static Class<? extends WritableComparator> comparatorForKeyType(byte keyType, boolean hasOrderBy)
+    private static Class<? extends WritableComparator> getRawComparator(byte keyType)
             throws JobCreationException {
 
         switch (keyType) {
@@ -1024,11 +1013,7 @@ public class TezDagBuilder extends TezOp
             return PigTextRawComparator.class;
 
         case DataType.BYTEARRAY:
-            //if (hasOrderBy) {
-                return PigBytesRawComparator.class;
-            //} else {
-            //    return PigDBAWritableComparator.class;
-            //}
+            return PigBytesRawComparator.class;
 
         case DataType.MAP:
             int errCode = 1068;
@@ -1036,68 +1021,7 @@ public class TezDagBuilder extends TezOp
             throw new JobCreationException(msg, errCode, PigException.INPUT);
 
         case DataType.TUPLE:
-            //TODO: PigTupleWritableComparator gives wrong results with cogroup in
-            //Checkin_2 and few other e2e tests. But MR has PigTupleWritableComparator
-            //Investigate the difference later
-            //if (hasOrderBy) {
-                return PigTupleSortComparator.class;
-            //} else {
-            //    return PigTupleWritableComparator.class;
-            //}
-
-        case DataType.BAG:
-            errCode = 1068;
-            msg = "Using Bag as key not supported.";
-            throw new JobCreationException(msg, errCode, PigException.INPUT);
-
-        default:
-            errCode = 2036;
-            msg = "Unhandled key type " + DataType.findTypeName(keyType);
-            throw new JobCreationException(msg, errCode, PigException.BUG);
-        }
-    }
-
-    private static Class<? extends WritableComparator> getGroupingComparatorForKeyType(byte keyType)
-            throws JobCreationException {
-
-        switch (keyType) {
-        case DataType.BOOLEAN:
-            return PigGroupingBooleanWritableComparator.class;
-
-        case DataType.INTEGER:
-            return PigGroupingIntWritableComparator.class;
-
-        case DataType.BIGINTEGER:
-            return PigGroupingBigIntegerWritableComparator.class;
-
-        case DataType.BIGDECIMAL:
-            return PigGroupingBigDecimalWritableComparator.class;
-
-        case DataType.LONG:
-            return PigGroupingLongWritableComparator.class;
-
-        case DataType.FLOAT:
-            return PigGroupingFloatWritableComparator.class;
-
-        case DataType.DOUBLE:
-            return PigGroupingDoubleWritableComparator.class;
-
-        case DataType.DATETIME:
-            return PigGroupingDateTimeWritableComparator.class;
-
-        case DataType.CHARARRAY:
-            return PigGroupingCharArrayWritableComparator.class;
-
-        case DataType.BYTEARRAY:
-            return PigGroupingDBAWritableComparator.class;
-
-        case DataType.MAP:
-            int errCode = 1068;
-            String msg = "Using Map as key not supported.";
-            throw new JobCreationException(msg, errCode, PigException.INPUT);
-
-        case DataType.TUPLE:
-            return PigGroupingTupleWritableComparator.class;
+            return PigTupleSortComparator.class;
 
         case DataType.BAG:
             errCode = 1068;
@@ -1127,29 +1051,11 @@ public class TezDagBuilder extends TezOp
                         PigGroupingPartitionWritableComparator.class.getName());
                 setGroupingComparator(conf, PigGroupingPartitionWritableComparator.class.getName());
             } else {
-                boolean hasOrderby = hasOrderby(tezOp);
                 conf.setClass(
                         TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
-                        comparatorForKeyType(keyType, hasOrderby), RawComparator.class);
-                if (!hasOrderby) {
-                    setGroupingComparator(conf, getGroupingComparatorForKeyType(keyType).getName());
-                }
-            }
-        }
-    }
-
-    private boolean hasOrderby(TezOperator tezOp) {
-        boolean hasOrderBy = tezOp.isGlobalSort() || tezOp.isLimitAfterSort();
-        if (!hasOrderBy) {
-            // Check if it is a Orderby sampler job
-            List<TezOperator> succs = getPlan().getSuccessors(tezOp);
-            if (succs != null && succs.size() == 1) {
-                if (succs.get(0).isGlobalSort()) {
-                    hasOrderBy = true;
-                }
+                        getRawComparator(keyType), RawComparator.class);
             }
         }
-        return hasOrderBy;
     }
 
     private void setGroupingComparator(Configuration conf, String comparatorClass) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java Tue Aug 11 23:01:10 2015
@@ -93,9 +93,9 @@ public class POShuffleTezLoad extends PO
         try {
             for (String inputKey : inputKeys) {
                 LogicalInput input = inputs.get(inputKey);
-                // 1) Case of self join/cogroup/cross with Split.
+                // 1) Case of self join/cogroup/cross with Split - numTezInputs < numInputs/inputKeys
                 //     - Same TezInput will contain multiple indexes in case of join
-                // 2) data unioned within Split
+                // 2) data unioned within Split - inputKeys > numInputs/numTezInputs
                 //     - Input key will be repeated, but index would be same within a TezInput
                 if (!this.inputs.contains(input)) {
                     this.inputs.add(input);
@@ -133,7 +133,6 @@ public class POShuffleTezLoad extends PO
             boolean hasData = false;
             Object cur = null;
             PigNullableWritable min = null;
-            int minIndex = -1;
 
             try {
                 for (int i = 0; i < numTezInputs; i++) {
@@ -143,7 +142,6 @@ public class POShuffleTezLoad extends PO
                         if (min == null || comparator.compare(min, cur) > 0) {
                             //Not a deep clone. Writable is referenced.
                             min = ((PigNullableWritable)cur).clone();
-                            minIndex = i;
                         }
                     }
                 }
@@ -169,7 +167,6 @@ public class POShuffleTezLoad extends PO
                 if (isAccumulative()) {
 
                     buffer.setCurrentKey(min);
-                    buffer.setCurrentKeyIndex(minIndex);
                     for (int i = 0; i < numInputs; i++) {
                         bags[i] = new AccumulativeBag(buffer, i);
                     }
@@ -185,8 +182,7 @@ public class POShuffleTezLoad extends PO
                         if (!finished[i]) {
                             cur = readers.get(i).getCurrentKey();
                             // We need to loop in case of Grouping Comparators
-                            while (comparator.compare(min, cur) == 0
-                                    && (!min.isNull() || (min.isNull() && i == minIndex))) {
+                            while (comparator.compare(min, cur) == 0) {
                                 Iterable<Object> vals = readers.get(i).getCurrentValues();
                                 for (Object val : vals) {
                                     NullableTuple nTup = (NullableTuple) val;
@@ -241,7 +237,6 @@ public class POShuffleTezLoad extends PO
         private int batchSize;
         private List<Tuple>[] bags;
         private PigNullableWritable min;
-        private int minIndex;
         private boolean clearedCurrent = true;
 
         @SuppressWarnings("unchecked")
@@ -262,10 +257,6 @@ public class POShuffleTezLoad extends PO
             clearedCurrent = false;
         }
 
-        public void setCurrentKeyIndex(int curKeyIndex) {
-            this.minIndex = curKeyIndex;
-        }
-
         @Override
         public boolean hasNextBatch() {
             Object cur = null;
@@ -273,8 +264,7 @@ public class POShuffleTezLoad extends PO
                 for (int i = 0; i < numTezInputs; i++) {
                     if (!finished[i]) {
                         cur = readers.get(i).getCurrentKey();
-                        if (comparator.compare(min, cur) == 0
-                                && (!min.isNull() || (min.isNull() && i == minIndex))) {
+                        if (comparator.compare(min, cur) == 0) {
                             return true;
                         }
                     }
@@ -297,8 +287,7 @@ public class POShuffleTezLoad extends PO
                     if (!finished[i]) {
                         cur = readers.get(i).getCurrentKey();
                         int batchCount = 0;
-                        while (comparator.compare(min, cur) == 0 && (!min.isNull() ||
-                                min.isNull() && i==minIndex)) {
+                        while (comparator.compare(min, cur) == 0) {
                             Iterator<Object> iter = readers.get(i).getCurrentValues().iterator();
                             while (iter.hasNext() && batchCount < batchSize) {
                                 NullableTuple nTup = (NullableTuple) iter.next();
@@ -339,8 +328,7 @@ public class POShuffleTezLoad extends PO
                 for (int i = 0; i < numTezInputs; i++) {
                     if (!finished[i]) {
                         cur = readers.get(i).getCurrentKey();
-                        while (comparator.compare(min, cur) == 0 && (!min.isNull() ||
-                                min.isNull() && i==minIndex)) {
+                        while (comparator.compare(min, cur) == 0) {
                             finished[i] = !readers.get(i).next();
                             if (finished[i]) {
                                 break;

Modified: pig/trunk/test/e2e/pig/tests/multiquery.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/multiquery.conf?rev=1695396&r1=1695395&r2=1695396&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/multiquery.conf (original)
+++ pig/trunk/test/e2e/pig/tests/multiquery.conf Tue Aug 11 23:01:10 2015
@@ -572,8 +572,6 @@ $cfg = {
         
         {
         'name' => 'MultiQuery_Union',
-        'floatpostprocess' => 1,
-        'delimiter' => '    ',
         'tests' => [
             { 
             # Multiple levels of union + join
@@ -639,6 +637,8 @@ store e into ':OUTPATH:';\,
             { 
             # Union + Groupby + Combiner
             'num' => 6,
+            'floatpostprocess' => 1,
+            'delimiter' => '    ',
             'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 a1 = filter a by gpa >= 3.9;
 a2 = filter a by gpa < 2;
@@ -650,6 +650,8 @@ store e into ':OUTPATH:';\,
             {
             # Union + Groupby + Secondary key partitioner
             'num' => 7,
+            'floatpostprocess' => 1,
+            'delimiter' => '    ',
             'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 a1 = filter a by gpa >= 3.9;
 a2 = filter a by gpa < 2;
@@ -674,13 +676,11 @@ store d into ':OUTPATH:';\,
         
         {
         'name' => 'MultiQuery_Self',
-        'floatpostprocess' => 1,
-        'delimiter' => '    ',
         'tests' => [
             # Self cross
             {
             'num' => 1,
-            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
 b = filter a by gpa >= 3.9;
 c = filter a by gpa <= 0.5;
 d = filter a by gpa >= 3.5 and gpa < 3.9;
@@ -693,8 +693,8 @@ store g into ':OUTPATH:.2';\,
             {
             # Self cogroup
             'num' => 2,
-            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
-b = filter a by gpa >= 3.9;
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3;
 c = filter a by gpa < 2;
 d = cogroup c by name, b by name;
 e = foreach d generate flatten(c), flatten(b);
@@ -703,18 +703,18 @@ store e into ':OUTPATH:';\,
             {
             # Three way join (two self)
             'num' => 3,
-            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
-b = filter a by gpa >= 3.9;
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3;
 c = filter a by gpa < 2;
-d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
 e = join b by name, c by name, d by name PARALLEL 2;
 store e into ':OUTPATH:';\,
             },
             {
             # Self join replicated
             'num' => 4,
-            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
-b = filter a by gpa >= 3.9;
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3;
 c = filter a by gpa < 2;
 d = join c by name, b by name using 'replicated';
 store d into ':OUTPATH:';\,
@@ -722,12 +722,39 @@ store d into ':OUTPATH:';\,
             {
             # Self join skewed
             'num' => 5,
-            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
-b = filter a by gpa >= 3.9;
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3;
 c = filter a by gpa < 2;
 d = join c by name, b by name using 'skewed' PARALLEL 2;
 store d into ':OUTPATH:';\,
             },
+            {
+            # Self join left outer
+            'num' => 6,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3;
+c = filter a by gpa < 2;
+d = join c by name left outer, b by name PARALLEL 2;
+store d into ':OUTPATH:';\,
+            },
+            {
+            # Self join right outer
+            'num' => 7,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3;
+c = filter a by gpa < 2;
+d = join c by name right outer, b by name PARALLEL 2;
+store d into ':OUTPATH:';\,
+            },
+            {
+            # Self join full outer
+            'num' => 8,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = filter a by gpa >= 3;
+c = filter a by gpa < 2;
+d = join c by name full outer, b by name PARALLEL 2;
+store d into ':OUTPATH:';\,
+            }
             ] # end of tests
         },