You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/08/18 21:56:27 UTC

svn commit: r1696491 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/ src/org/apache/pig/backend/had...

Author: rohini
Date: Tue Aug 18 19:56:26 2015
New Revision: 1696491

URL: http://svn.apache.org/r1696491
Log:
PIG-4657: [Pig on Tez] Optimize GroupBy and Distinct key comparison (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
    pig/trunk/test/e2e/pig/tests/nightly.conf

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1696491&r1=1696490&r2=1696491&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Aug 18 19:56:26 2015
@@ -462,6 +462,9 @@ PIG-3939: SPRINTF function to format str
 PIG-3970: Merge Tez branch into trunk (daijy)
  
 OPTIMIZATIONS
+
+PIG-4657: [Pig on Tez] Optimize GroupBy and Distinct key comparison (rohini)
+
  
 BUG FIXES
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java?rev=1696491&r1=1696490&r2=1696491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java Tue Aug 18 19:56:26 2015
@@ -17,12 +17,341 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.pig.impl.io.NullablePartitionWritable;
 
 public class PigWritableComparators {
 
+    // Byte only raw comparators for faster comparison for non-orderby jobs. Not re-using
+    // JobControlCompiler.Pig<DataType>WritableComparator which extend PigWritableComparator.
+    // Those use PigNullablePartitionWritable.compareTo which is not that efficient in cases like
+    // tuple where tuple is iterated for null checking instead of taking advantage of
+    // TupleRawComparator.hasComparedTupleNull(). Also skips multi-query index checking
+
+    public static class PigBooleanRawBytesComparator extends PigBooleanRawComparator {
+
+        public PigBooleanRawBytesComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+    }
+
+    public static class PigIntRawBytesComparator extends PigIntRawComparator {
+
+        public PigIntRawBytesComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+    }
+
+    public static class PigBigIntegerRawBytesComparator extends PigBigIntegerRawComparator {
+
+        public PigBigIntegerRawBytesComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+    }
+
+    public static class PigBigDecimalRawBytesComparator extends PigBigDecimalRawComparator {
+
+        public PigBigDecimalRawBytesComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+    }
+
+    public static class PigLongRawBytesComparator extends PigLongRawComparator {
+
+        public PigLongRawBytesComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+    }
+
+    public static class PigFloatRawBytesComparator extends PigFloatRawComparator {
+
+        public PigFloatRawBytesComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+    }
+
+    public static class PigDoubleRawBytesComparator extends PigDoubleRawComparator {
+
+        public PigDoubleRawBytesComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+    }
+
+    public static class PigDateTimeRawBytesComparator extends PigDateTimeRawComparator {
+
+        public PigDateTimeRawBytesComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+    }
+
+    public static class PigTextRawBytesComparator extends PigTextRawComparator {
+
+        public PigTextRawBytesComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+    }
+
+    public static class PigBytesRawBytesComparator extends PigBytesRawComparator {
+
+        public PigBytesRawBytesComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+    }
+
+    public static class PigTupleSortBytesComparator extends PigTupleSortComparator {
+
+        public PigTupleSortBytesComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+    }
+
+    //
+    // Byte only raw comparators for faster comparison for Skewed Join.
+    //
+    public static class PigBooleanRawBytesPartitionComparator extends PigBooleanRawComparator {
+
+        public PigBooleanRawBytesPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigIntRawBytesPartitionComparator extends PigIntRawComparator {
+
+        public PigIntRawBytesPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigBigIntegerRawBytesPartitionComparator extends PigBigIntegerRawComparator {
+
+        public PigBigIntegerRawBytesPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigBigDecimalRawBytesPartitionComparator extends PigBigDecimalRawComparator {
+
+        public PigBigDecimalRawBytesPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigLongRawBytesPartitionComparator extends PigLongRawComparator {
+
+        public PigLongRawBytesPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigFloatRawBytesPartitionComparator extends PigFloatRawComparator {
+
+        public PigFloatRawBytesPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigDoubleRawBytesPartitionComparator extends PigDoubleRawComparator {
+
+        public PigDoubleRawBytesPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigDateTimeRawBytesPartitionComparator extends PigDateTimeRawComparator {
+
+        public PigDateTimeRawBytesPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigTextRawBytesPartitionComparator extends PigTextRawComparator {
+
+        public PigTextRawBytesPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigBytesRawBytesPartitionComparator extends PigBytesRawComparator {
+
+        public PigBytesRawBytesPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigTupleSortBytesPartitionComparator extends PigTupleSortComparator {
+
+        public PigTupleSortBytesPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
     //
-    // Raw Comparators for Skewed Join
+    //  Raw Comparators for Skewed Join
     //
     public static class PigBooleanRawPartitionComparator extends PigBooleanRawComparator {
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1696491&r1=1696490&r2=1696491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue Aug 18 19:56:26 2015
@@ -283,7 +283,7 @@ public class TezDagBuilder extends TezOp
                         if (tezOp.isVertexGroup()) {
                             groupMembers[i] = from;
                         } else {
-                            EdgeProperty prop = newEdge(pred, tezOp);
+                            EdgeProperty prop = newEdge(pred, tezOp, false);
                             Edge edge = Edge.create(from, to, prop);
                             dag.addEdge(edge);
                         }
@@ -311,7 +311,7 @@ public class TezDagBuilder extends TezOp
     private GroupInputEdge newGroupInputEdge(TezOperator fromOp,
             TezOperator toOp, VertexGroup from, Vertex to) throws IOException {
 
-        EdgeProperty edgeProperty = newEdge(fromOp, toOp);
+        EdgeProperty edgeProperty = newEdge(fromOp, toOp, true);
 
         String groupInputClass = ConcatenatedMergedKeyValueInput.class.getName();
 
@@ -334,7 +334,7 @@ public class TezDagBuilder extends TezOp
      * @return EdgeProperty
      * @throws IOException
      */
-    private EdgeProperty newEdge(TezOperator from, TezOperator to)
+    private EdgeProperty newEdge(TezOperator from, TezOperator to, boolean isMergedInput)
             throws IOException {
         TezEdgeDescriptor edge = to.inEdges.get(from.getOperatorKey());
         PhysicalPlan combinePlan = edge.combinePlan;
@@ -345,7 +345,7 @@ public class TezDagBuilder extends TezOp
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
         UDFContext.getUDFContext().serialize(conf);
         if (!combinePlan.isEmpty()) {
-            addCombiner(combinePlan, to, conf);
+            addCombiner(combinePlan, to, conf, isMergedInput);
         }
 
         List<POLocalRearrangeTez> lrs = PlanHelper.getPhysicalOperators(from.plan,
@@ -354,7 +354,7 @@ public class TezDagBuilder extends TezOp
         for (POLocalRearrangeTez lr : lrs) {
             if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
                 byte keyType = lr.getKeyType();
-                setIntermediateOutputKeyValue(keyType, conf, to, lr.isConnectedToPackage());
+                setIntermediateOutputKeyValue(keyType, conf, to, lr.isConnectedToPackage(), isMergedInput);
                 // In case of secondary key sort, main key type is the actual key type
                 conf.set("pig.reduce.key.type", Byte.toString(lr.getMainKeyType()));
                 break;
@@ -435,11 +435,11 @@ public class TezDagBuilder extends TezOp
     }
 
     private void addCombiner(PhysicalPlan combinePlan, TezOperator pkgTezOp,
-            Configuration conf) throws IOException {
+            Configuration conf, boolean isMergedInput) throws IOException {
         POPackage combPack = (POPackage) combinePlan.getRoots().get(0);
         POLocalRearrange combRearrange = (POLocalRearrange) combinePlan
                 .getLeaves().get(0);
-        setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf, pkgTezOp);
+        setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf, pkgTezOp, true, isMergedInput);
 
         LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
                 combinePlan, null, pkgTezOp, combPack);
@@ -538,13 +538,14 @@ public class TezDagBuilder extends TezOp
             byte keyType = pack.getPkgr().getKeyType();
             tezOp.plan.remove(pack);
             payloadConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
-            setIntermediateOutputKeyValue(keyType, payloadConf, tezOp);
+
             POShuffleTezLoad newPack = new POShuffleTezLoad(pack);
             if (tezOp.isSkewedJoin()) {
                 newPack.setSkewedJoins(true);
             }
             tezOp.plan.add(newPack);
 
+            boolean isMergedInput = false;
             // Set input keys for POShuffleTezLoad. This is used to identify
             // the inputs that are attached to the POShuffleTezLoad in the
             // backend.
@@ -554,7 +555,9 @@ public class TezDagBuilder extends TezOp
                     // skip sample vertex input
                 } else {
                     String inputKey = pred.getOperatorKey().toString();
+                    boolean isVertexGroup = false;
                     if (pred.isVertexGroup()) {
+                        isVertexGroup = true;
                         pred = mPlan.getOperator(pred.getVertexGroupMembers().get(0));
                     }
                     LinkedList<POLocalRearrangeTez> lrs =
@@ -563,6 +566,9 @@ public class TezDagBuilder extends TezOp
                         if (lr.isConnectedToPackage()
                                 && lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
                             localRearrangeMap.put((int) lr.getIndex(), inputKey);
+                            if (isVertexGroup) {
+                                isMergedInput = true;
+                            }
                         }
                     }
                 }
@@ -577,7 +583,8 @@ public class TezDagBuilder extends TezOp
                 }
             }
 
-            setIntermediateOutputKeyValue(pack.getPkgr().getKeyType(), payloadConf, tezOp);
+            //POShuffleTezLoad accesses the comparator setting
+            selectKeyComparator(keyType, payloadConf, tezOp, isMergedInput);
         } else if (roots.size() == 1 && roots.get(0) instanceof POIdentityInOutTez) {
             POIdentityInOutTez identityInOut = (POIdentityInOutTez) roots.get(0);
             // TODO Need to fix multiple input key mapping
@@ -953,14 +960,9 @@ public class TezDagBuilder extends TezOp
         return stores;
     }
 
-    private void setIntermediateOutputKeyValue(byte keyType, Configuration conf, TezOperator tezOp)
-            throws JobCreationException, ExecException {
-        setIntermediateOutputKeyValue(keyType, conf, tezOp, true);
-    }
-
     @SuppressWarnings("rawtypes")
     private void setIntermediateOutputKeyValue(byte keyType, Configuration conf, TezOperator tezOp,
-            boolean isConnectedToPackage) throws JobCreationException, ExecException {
+            boolean isConnectedToPackage, boolean isMergedInput) throws JobCreationException, ExecException {
         if (tezOp != null && tezOp.isUseSecondaryKey() && isConnectedToPackage) {
             conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS,
                     NullableTuple.class.getName());
@@ -978,12 +980,86 @@ public class TezDagBuilder extends TezOp
                 NullableTuple.class.getName());
         conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
                 MRPartitioner.class.getName());
-        selectOutputComparator(keyType, conf, tezOp);
+        selectKeyComparator(keyType, conf, tezOp, isMergedInput);
+    }
+
+    private static Class<? extends WritableComparator> getRawBytesComparator(
+            byte keyType) throws JobCreationException {
+
+        // These comparators only compare bytes and we will use them except for
+        // order by for faster sorting.
+        // This ordering is good enough to be fed to reducer (POShuffleTezLoad)
+        // which will use the full comparator (GroupingComparator) for correct
+        // sorting and grouping.
+        // TODO: PIG-4652. Till Tez exposes a way to get bytes of keys being compared,
+        // we can use this only for groupby and distinct which are single inputs in
+        // POShuffleTezLoad and not join which has multiple inputs.
+
+        switch (keyType) {
+        case DataType.BOOLEAN:
+            return PigWritableComparators.PigBooleanRawBytesComparator.class;
+
+        case DataType.INTEGER:
+            return PigWritableComparators.PigIntRawBytesComparator.class;
+
+        case DataType.BIGINTEGER:
+            return PigWritableComparators.PigBigIntegerRawBytesComparator.class;
+
+        case DataType.BIGDECIMAL:
+            return PigWritableComparators.PigBigDecimalRawBytesComparator.class;
+
+        case DataType.LONG:
+            return PigWritableComparators.PigLongRawBytesComparator.class;
+
+        case DataType.FLOAT:
+            return PigWritableComparators.PigFloatRawBytesComparator.class;
+
+        case DataType.DOUBLE:
+            return PigWritableComparators.PigDoubleRawBytesComparator.class;
+
+        case DataType.DATETIME:
+            return PigWritableComparators.PigDateTimeRawBytesComparator.class;
+
+        case DataType.CHARARRAY:
+            return PigWritableComparators.PigTextRawBytesComparator.class;
+
+        case DataType.BYTEARRAY:
+            return PigWritableComparators.PigBytesRawBytesComparator.class;
+
+        case DataType.MAP:
+            int errCode = 1068;
+            String msg = "Using Map as key not supported.";
+            throw new JobCreationException(msg, errCode, PigException.INPUT);
+
+        case DataType.TUPLE:
+            return PigWritableComparators.PigTupleSortBytesComparator.class;
+
+        case DataType.BAG:
+            errCode = 1068;
+            msg = "Using Bag as key not supported.";
+            throw new JobCreationException(msg, errCode, PigException.INPUT);
+
+        default:
+            errCode = 2036;
+            msg = "Unhandled key type " + DataType.findTypeName(keyType);
+            throw new JobCreationException(msg, errCode, PigException.BUG);
+        }
     }
 
     private static Class<? extends WritableComparator> getRawComparator(byte keyType)
             throws JobCreationException {
 
+        // These are full comparators used in order by jobs and as GroupingComparator in
+        // POShuffleTezLoad for other operations.
+
+        // Mapreduce uses PigGrouping<DataType>WritableComparator for non-orderby jobs.
+        // In Tez, we will use the raw comparators itself on the reduce side as well as it is
+        // now fixed to handle nulls for different indexes.
+        // Also PigGrouping<DataType>WritableComparator use PigNullablePartitionWritable.compareTo
+        // which is not that efficient for cases like tuple where tuple is iterated for null checking
+        // instead of taking advantage of TupleRawComparator.hasComparedTupleNull().
+        // Also skips multi-query index checking
+
         switch (keyType) {
         case DataType.BOOLEAN:
             return PigBooleanRawComparator.class;
@@ -1035,6 +1111,61 @@ public class TezDagBuilder extends TezOp
         }
     }
 
+    private static Class<? extends WritableComparator> getRawBytesComparatorForSkewedJoin(byte keyType)
+            throws JobCreationException {
+
+        // Extended Raw Bytes Comparators for SkewedJoin which unwrap the NullablePartitionWritable
+        switch (keyType) {
+        case DataType.BOOLEAN:
+            return PigWritableComparators.PigBooleanRawBytesPartitionComparator.class;
+
+        case DataType.INTEGER:
+            return PigWritableComparators.PigIntRawBytesPartitionComparator.class;
+
+        case DataType.BIGINTEGER:
+            return PigWritableComparators.PigBigIntegerRawBytesPartitionComparator.class;
+
+        case DataType.BIGDECIMAL:
+            return PigWritableComparators.PigBigDecimalRawBytesPartitionComparator.class;
+
+        case DataType.LONG:
+            return PigWritableComparators.PigLongRawBytesPartitionComparator.class;
+
+        case DataType.FLOAT:
+            return PigWritableComparators.PigFloatRawBytesPartitionComparator.class;
+
+        case DataType.DOUBLE:
+            return PigWritableComparators.PigDoubleRawBytesPartitionComparator.class;
+
+        case DataType.DATETIME:
+            return PigWritableComparators.PigDateTimeRawBytesPartitionComparator.class;
+
+        case DataType.CHARARRAY:
+            return PigWritableComparators.PigTextRawBytesPartitionComparator.class;
+
+        case DataType.BYTEARRAY:
+            return PigWritableComparators.PigBytesRawBytesPartitionComparator.class;
+
+        case DataType.MAP:
+            int errCode = 1068;
+            String msg = "Using Map as key not supported.";
+            throw new JobCreationException(msg, errCode, PigException.INPUT);
+
+        case DataType.TUPLE:
+            return PigWritableComparators.PigTupleSortBytesPartitionComparator.class;
+
+        case DataType.BAG:
+            errCode = 1068;
+            msg = "Using Bag as key not supported.";
+            throw new JobCreationException(msg, errCode, PigException.INPUT);
+
+        default:
+            errCode = 2036;
+            msg = "Unhandled key type " + DataType.findTypeName(keyType);
+            throw new JobCreationException(msg, errCode, PigException.BUG);
+        }
+    }
+
     private static Class<? extends WritableComparator> getRawComparatorForSkewedJoin(byte keyType)
             throws JobCreationException {
 
@@ -1090,7 +1221,7 @@ public class TezDagBuilder extends TezOp
         }
     }
 
-    void selectOutputComparator(byte keyType, Configuration conf, TezOperator tezOp)
+    void selectKeyComparator(byte keyType, Configuration conf, TezOperator tezOp, boolean isMergedInput)
             throws JobCreationException {
         // TODO: Handle sorting like in JobControlCompiler
         // TODO: Group comparators as in JobControlCompiler
@@ -1102,7 +1233,13 @@ public class TezDagBuilder extends TezOp
                     PigSecondaryKeyComparator.class.getName());
             setGroupingComparator(conf, PigSecondaryKeyGroupComparator.class.getName());
         } else {
-            if (tezOp.isSkewedJoin()) {
+            // If it is not a merged input (OrderedGroupedMergedKVInput) from union then
+            // use bytes only comparator. This is temporary till PIG-4652 is done
+            if (!isMergedInput && (tezOp.isGroupBy() || tezOp.isDistinct())) {
+                conf.setClass(
+                        TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+                        getRawBytesComparator(keyType), RawComparator.class);
+            } else if (tezOp.isSkewedJoin()) {
                 conf.setClass(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
                         getRawComparatorForSkewedJoin(keyType), RawComparator.class);
             } else {
@@ -1110,9 +1247,58 @@ public class TezDagBuilder extends TezOp
                         TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
                         getRawComparator(keyType), RawComparator.class);
             }
+
+            // Comparators now
+            //             groupby/distinct : Comparator - RawBytesComparator
+            // groupby/distinct after union : Comparator - RawComparator
+            //                      orderby : Comparator - RawComparator
+            //                  skewed join : Comparator - RawPartitionComparator
+            //           Rest (other joins) : Comparator - RawComparator
+
+            //TODO: In PIG-4652: After Tez support for exposing key bytes
+            //    groupby/distinct : Comparator - RawBytesComparator. No grouping comparator required.
+            //             orderby : Comparator - RawComparator. No grouping comparator required.
+            //         skewed join : Comparator - RawBytesPartitionComparator, GroupingComparator - RawPartitionComparator
+            //  Rest (other joins) : Comparator - RawBytesComparator, GroupingComparator - RawComparator
+
+            /*
+            if (tezOp.isSkewedJoin()) {
+                conf.setClass(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+                        getRawBytesComparatorForSkewedJoin(keyType), RawComparator.class);
+                setGroupingComparator(conf,  getRawComparatorForSkewedJoin(keyType).getName());
+            } else if (tezOp.isGroupBy() || tezOp.isDistinct()) {
+                conf.setClass(
+                        TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+                        getRawBytesComparator(keyType), RawComparator.class);
+            } else if (hasOrderby(tezOp)) {
+                conf.setClass(
+                        TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+                        getRawComparator(keyType), RawComparator.class);
+            } else {
+                conf.setClass(
+                        TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+                        getRawBytesComparator(keyType), RawComparator.class);
+                setGroupingComparator(conf, getRawComparator(keyType).getName());
+            }
+            */
         }
     }
 
+    private boolean hasOrderby(TezOperator tezOp) {
+        boolean hasOrderBy = tezOp.isGlobalSort() || tezOp.isLimitAfterSort();
+        if (!hasOrderBy) {
+            // Check if it is a Orderby sampler job
+            List<TezOperator> succs = getPlan().getSuccessors(tezOp);
+            if (succs != null && succs.size() == 1) {
+                if (succs.get(0).isGlobalSort()) {
+                    hasOrderBy = true;
+                }
+            }
+        }
+        return hasOrderBy;
+    }
+
+
     private void setGroupingComparator(Configuration conf, String comparatorClass) {
         // In MR - job.setGroupingComparatorClass() or MRJobConfig.GROUP_COMPARATOR_CLASS
         // TODO: Check why tez-mapreduce ReduceProcessor use two different tez

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1696491&r1=1696490&r2=1696491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Tue Aug 18 19:56:26 2015
@@ -688,6 +688,7 @@ public class TezCompiler extends PhyPlan
             clr.setDistinct(true);
             combinePlan.addAsLeaf(clr);
 
+            curTezOp.markDistinct();
             addDistinctPlan(curTezOp.plan, op.getRequestedParallelism());
             curTezOp.setRequestedParallelism(op.getRequestedParallelism());
             phyToTezOpMap.put(op, curTezOp);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1696491&r1=1696490&r2=1696491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Tue Aug 18 19:56:26 2015
@@ -173,6 +173,8 @@ public class TezOperator extends Operato
         LIMIT_AFTER_SORT,
         // Indicate if this job is a union job
         UNION,
+        // Indicate if this job is a distinct job
+        DISTINCT,
         // Indicate if this job is a native job
         NATIVE;
     };
@@ -420,6 +422,14 @@ public class TezOperator extends Operato
         feature.set(OPER_FEATURE.UNION.ordinal());
     }
 
+    public boolean isDistinct() {
+        return feature.get(OPER_FEATURE.DISTINCT.ordinal());
+    }
+
+    public void markDistinct() {
+        feature.set(OPER_FEATURE.DISTINCT.ordinal());
+    }
+
     public boolean isNative() {
         return feature.get(OPER_FEATURE.NATIVE.ordinal());
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1696491&r1=1696490&r2=1696491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java Tue Aug 18 19:56:26 2015
@@ -58,6 +58,7 @@ public class POShuffleTezLoad extends PO
     private transient boolean[] finished;
     private transient boolean[] readOnce;
     private transient WritableComparator comparator = null;
+    private transient WritableComparator groupingComparator = null;
     private transient Configuration conf;
     private transient int accumulativeBatchSize;
 
@@ -87,7 +88,8 @@ public class POShuffleTezLoad extends PO
         this.conf = conf;
         this.inputs = new ArrayList<LogicalInput>();
         this.readers = new ArrayList<KeyValuesReader>();
-        this.comparator = (WritableComparator) ConfigUtils.getInputKeySecondaryGroupingComparator(conf);
+        this.comparator = (WritableComparator) ConfigUtils.getIntermediateInputKeyComparator(conf);
+        this.groupingComparator = (WritableComparator) ConfigUtils.getInputKeySecondaryGroupingComparator(conf);
         this.accumulativeBatchSize = AccumulatorOptimizerUtil.getAccumulativeBatchSize();
 
         try {
@@ -135,13 +137,25 @@ public class POShuffleTezLoad extends PO
             PigNullableWritable min = null;
 
             try {
-                for (int i = 0; i < numTezInputs; i++) {
-                    if (!finished[i]) {
+                if (numTezInputs == 1) {
+                    if (!finished[0]) {
                         hasData = true;
-                        cur = readers.get(i).getCurrentKey();
-                        if (min == null || comparator.compare(min, cur) > 0) {
-                            //Not a deep clone. Writable is referenced.
-                            min = ((PigNullableWritable)cur).clone();
+                        cur = readers.get(0).getCurrentKey();
+                        // Just move to the next key without comparison
+                        min = ((PigNullableWritable)cur).clone();
+                    }
+                } else {
+                    for (int i = 0; i < numTezInputs; i++) {
+                        if (!finished[i]) {
+                            hasData = true;
+                            cur = readers.get(i).getCurrentKey();
+                            // TODO: PIG-4652 should compare key bytes instead
+                            // of deserialized objects when using BytesComparator
+                            // for faster comparison
+                            if (min == null || comparator.compare(min, cur) > 0) {
+                                //Not a deep clone. Writable is referenced.
+                                min = ((PigNullableWritable)cur).clone();
+                            }
                         }
                     }
                 }
@@ -177,24 +191,40 @@ public class POShuffleTezLoad extends PO
                         bags[i] = new InternalCachedBag(numInputs);
                     }
 
-                    for (int i = 0; i < numTezInputs; i++) {
-
-                        if (!finished[i]) {
-                            cur = readers.get(i).getCurrentKey();
-                            // We need to loop in case of Grouping Comparators
-                            while (comparator.compare(min, cur) == 0) {
-                                Iterable<Object> vals = readers.get(i).getCurrentValues();
-                                for (Object val : vals) {
-                                    NullableTuple nTup = (NullableTuple) val;
-                                    int index = nTup.getIndex();
-                                    Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
-                                    bags[index].add(tup);
-                                }
-                                finished[i] = !readers.get(i).next();
-                                if (finished[i]) {
-                                    break;
-                                }
+                    if (numTezInputs == 1) {
+                        do {
+                            Iterable<Object> vals = readers.get(0).getCurrentValues();
+                            for (Object val : vals) {
+                                NullableTuple nTup = (NullableTuple) val;
+                                int index = nTup.getIndex();
+                                Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+                                bags[index].add(tup);
+                            }
+                            finished[0] = !readers.get(0).next();
+                            if (finished[0]) {
+                                break;
+                            }
+                            cur = readers.get(0).getCurrentKey();
+                        } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators
+                    } else {
+                        for (int i = 0; i < numTezInputs; i++) {
+                            if (!finished[i]) {
                                 cur = readers.get(i).getCurrentKey();
+                                // We need to loop in case of Grouping Comparators
+                                while (groupingComparator.compare(min, cur) == 0) {
+                                    Iterable<Object> vals = readers.get(i).getCurrentValues();
+                                    for (Object val : vals) {
+                                        NullableTuple nTup = (NullableTuple) val;
+                                        int index = nTup.getIndex();
+                                        Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+                                        bags[index].add(tup);
+                                    }
+                                    finished[i] = !readers.get(i).next();
+                                    if (finished[i]) {
+                                        break;
+                                    }
+                                    cur = readers.get(i).getCurrentKey();
+                                }
                             }
                         }
                     }
@@ -264,7 +294,7 @@ public class POShuffleTezLoad extends PO
                 for (int i = 0; i < numTezInputs; i++) {
                     if (!finished[i]) {
                         cur = readers.get(i).getCurrentKey();
-                        if (comparator.compare(min, cur) == 0) {
+                        if (groupingComparator.compare(min, cur) == 0) {
                             return true;
                         }
                     }
@@ -287,7 +317,7 @@ public class POShuffleTezLoad extends PO
                     if (!finished[i]) {
                         cur = readers.get(i).getCurrentKey();
                         int batchCount = 0;
-                        while (comparator.compare(min, cur) == 0) {
+                        while (groupingComparator.compare(min, cur) == 0) {
                             Iterator<Object> iter = readers.get(i).getCurrentValues().iterator();
                             while (iter.hasNext() && batchCount < batchSize) {
                                 NullableTuple nTup = (NullableTuple) iter.next();
@@ -328,7 +358,7 @@ public class POShuffleTezLoad extends PO
                 for (int i = 0; i < numTezInputs; i++) {
                     if (!finished[i]) {
                         cur = readers.get(i).getCurrentKey();
-                        while (comparator.compare(min, cur) == 0) {
+                        while (groupingComparator.compare(min, cur) == 0) {
                             finished[i] = !readers.get(i).next();
                             if (finished[i]) {
                                 break;

Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1696491&r1=1696490&r2=1696491&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Tue Aug 18 19:56:26 2015
@@ -1552,6 +1552,15 @@ d = cross a, c;
 e = union b, d;
 store e into ':OUTPATH:';\,
             },
+            { 
+            # Union + Distinct
+            'num' => 16,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = distinct c;
+store c into ':OUTPATH:';\,
+            }
 		]
 		},
 		{