You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/08/12 01:03:16 UTC

svn commit: r1695398 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/impl/io/

Author: rohini
Date: Tue Aug 11 23:03:16 2015
New Revision: 1695398

URL: http://svn.apache.org/r1695398
Log:
PIG-4651: Optimize NullablePartitionWritable serialization for skewed join (rohini)

Added:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1695398&r1=1695397&r2=1695398&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Aug 11 23:03:16 2015
@@ -38,6 +38,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4651: Optimize NullablePartitionWritable serialization for skewed join (rohini)
+
 PIG-4627: [Pig on Tez] Self join does not handle null values correctly (rohini)
 
 PIG-4644: PORelationToExprProject.clone() is broken (erwaman via rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java?rev=1695398&r1=1695397&r2=1695398&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java Tue Aug 11 23:03:16 2015
@@ -86,6 +86,48 @@ public class HDataType {
         }
     }
 
+    public static PigNullableWritable getNewWritableComparable(byte keyType) throws ExecException {
+        switch (keyType) {
+            case DataType.BAG:
+                return new NullableBag();
+            case DataType.BOOLEAN:
+                return new NullableBooleanWritable();
+            case DataType.BYTEARRAY:
+                return new NullableBytesWritable();
+            case DataType.CHARARRAY:
+                return new NullableText();
+            case DataType.DOUBLE:
+                return new NullableDoubleWritable();
+            case DataType.FLOAT:
+                return new NullableFloatWritable();
+            case DataType.INTEGER:
+                return new NullableIntWritable();
+            case DataType.BIGINTEGER:
+                return new NullableBigIntegerWritable();
+            case DataType.BIGDECIMAL:
+                return new NullableBigDecimalWritable();
+            case DataType.LONG:
+                return new NullableLongWritable();
+            case DataType.DATETIME:
+                return new NullableDateTimeWritable();
+            case DataType.TUPLE:
+                return new NullableTuple();
+            case DataType.MAP: {
+                int errCode = 1068;
+                String msg = "Using Map as key not supported.";
+                throw new ExecException(msg, errCode, PigException.INPUT);
+            }
+            default: {
+                if (typeToName == null) typeToName = DataType.genTypeToNameMap();
+                int errCode = 2044;
+                String msg = "The type "
+                    + typeToName.get(keyType) == null ? "" + keyType : typeToName.get(keyType)
+                    + " cannot be collected as a Key type";
+                throw new ExecException(msg, errCode, PigException.BUG);
+            }
+        }
+    }
+
     public static PigNullableWritable getWritableComparableTypes(Object o, byte keyType) throws ExecException{
 
         byte newKeyType = keyType;
@@ -261,6 +303,14 @@ public class HDataType {
         return wcKey;
     }
 
+    public static byte findTypeFromClassName(String className) throws ExecException {
+        if (classToTypeMap.containsKey(className)) {
+            return classToTypeMap.get(className);
+        } else {
+            throw new ExecException("Unable to map " + className + " to known types." + Arrays.toString(classToTypeMap.keySet().toArray()));
+        }
+    }
+
     public static byte findTypeFromNullableWritable(PigNullableWritable o) throws ExecException {
         if (o instanceof NullableBooleanWritable)
             return DataType.BOOLEAN;

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java?rev=1695398&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java Tue Aug 11 23:03:16 2015
@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import org.apache.pig.impl.io.NullablePartitionWritable;
+
+public class PigWritableComparators {
+
+    //
+    // Raw Comparators for Skewed Join
+    //
+    public static class PigBooleanRawPartitionComparator extends PigBooleanRawComparator {
+
+        public PigBooleanRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            // Skip the first byte which is the type of the key
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigIntRawPartitionComparator extends PigIntRawComparator {
+
+        public PigIntRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigBigIntegerRawPartitionComparator extends PigBigIntegerRawComparator {
+
+        public PigBigIntegerRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigBigDecimalRawPartitionComparator extends PigBigDecimalRawComparator {
+
+        public PigBigDecimalRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigLongRawPartitionComparator extends PigLongRawComparator {
+
+        public PigLongRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigFloatRawPartitionComparator extends PigFloatRawComparator {
+
+        public PigFloatRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigDoubleRawPartitionComparator extends PigDoubleRawComparator {
+
+        public PigDoubleRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigDateTimeRawPartitionComparator extends PigDateTimeRawComparator {
+
+        public PigDateTimeRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigTextRawPartitionComparator extends PigTextRawComparator {
+
+        public PigTextRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigBytesRawPartitionComparator extends PigBytesRawComparator {
+
+        public PigBytesRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigTupleSortPartitionComparator extends PigTupleSortComparator {
+
+        public PigTupleSortPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import org.apache.pig.impl.io.NullablePartitionWritable;
+
+public class PigWritableComparators {
+
+    //
+    // Raw Comparators for Skewed Join
+    //
+    public static class PigBooleanRawPartitionComparator extends PigBooleanRawComparator {
+
+        public PigBooleanRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            // Skip the first byte which is the type of the key
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigIntRawPartitionComparator extends PigIntRawComparator {
+
+        public PigIntRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigBigIntegerRawPartitionComparator extends PigBigIntegerRawComparator {
+
+        public PigBigIntegerRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigBigDecimalRawPartitionComparator extends PigBigDecimalRawComparator {
+
+        public PigBigDecimalRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigLongRawPartitionComparator extends PigLongRawComparator {
+
+        public PigLongRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigFloatRawPartitionComparator extends PigFloatRawComparator {
+
+        public PigFloatRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigDoubleRawPartitionComparator extends PigDoubleRawComparator {
+
+        public PigDoubleRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigDateTimeRawPartitionComparator extends PigDateTimeRawComparator {
+
+        public PigDateTimeRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigTextRawPartitionComparator extends PigTextRawComparator {
+
+        public PigTextRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigBytesRawPartitionComparator extends PigBytesRawComparator {
+
+        public PigBytesRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigTupleSortPartitionComparator extends PigTupleSortComparator {
+
+        public PigTupleSortPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1695398&r1=1695397&r2=1695398&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue Aug 11 23:03:16 2015
@@ -58,7 +58,6 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingPartitionWritableComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
@@ -77,6 +76,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSecondaryKeyComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextRawComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleSortComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -1035,21 +1035,76 @@ public class TezDagBuilder extends TezOp
         }
     }
 
+    private static Class<? extends WritableComparator> getRawComparatorForSkewedJoin(byte keyType)
+            throws JobCreationException {
+
+        // Extended Raw Comparators for SkewedJoin which unwrap the NullablePartitionWritable
+        switch (keyType) {
+        case DataType.BOOLEAN:
+            return PigWritableComparators.PigBooleanRawPartitionComparator.class;
+
+        case DataType.INTEGER:
+            return PigWritableComparators.PigIntRawPartitionComparator.class;
+
+        case DataType.BIGINTEGER:
+            return PigWritableComparators.PigBigIntegerRawPartitionComparator.class;
+
+        case DataType.BIGDECIMAL:
+            return PigWritableComparators.PigBigDecimalRawPartitionComparator.class;
+
+        case DataType.LONG:
+            return PigWritableComparators.PigLongRawPartitionComparator.class;
+
+        case DataType.FLOAT:
+            return PigWritableComparators.PigFloatRawPartitionComparator.class;
+
+        case DataType.DOUBLE:
+            return PigWritableComparators.PigDoubleRawPartitionComparator.class;
+
+        case DataType.DATETIME:
+            return PigWritableComparators.PigDateTimeRawPartitionComparator.class;
+
+        case DataType.CHARARRAY:
+            return PigWritableComparators.PigTextRawPartitionComparator.class;
+
+        case DataType.BYTEARRAY:
+            return PigWritableComparators.PigBytesRawPartitionComparator.class;
+
+        case DataType.MAP:
+            int errCode = 1068;
+            String msg = "Using Map as key not supported.";
+            throw new JobCreationException(msg, errCode, PigException.INPUT);
+
+        case DataType.TUPLE:
+            return PigWritableComparators.PigTupleSortPartitionComparator.class;
+
+        case DataType.BAG:
+            errCode = 1068;
+            msg = "Using Bag as key not supported.";
+            throw new JobCreationException(msg, errCode, PigException.INPUT);
+
+        default:
+            errCode = 2036;
+            msg = "Unhandled key type " + DataType.findTypeName(keyType);
+            throw new JobCreationException(msg, errCode, PigException.BUG);
+        }
+    }
+
     void selectOutputComparator(byte keyType, Configuration conf, TezOperator tezOp)
             throws JobCreationException {
         // TODO: Handle sorting like in JobControlCompiler
         // TODO: Group comparators as in JobControlCompiler
-        if (tezOp != null && tezOp.isUseSecondaryKey()) {
+        if (tezOp == null) {
+            return;
+        }
+        if (tezOp.isUseSecondaryKey()) {
             conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
                     PigSecondaryKeyComparator.class.getName());
             setGroupingComparator(conf, PigSecondaryKeyGroupComparator.class.getName());
         } else {
-            if (tezOp != null && tezOp.isSkewedJoin()) {
-                // TODO: PigGroupingPartitionWritableComparator only used as Group comparator in MR.
-                // What should be TEZ_RUNTIME_KEY_COMPARATOR_CLASS if same as MR?
-                conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
-                        PigGroupingPartitionWritableComparator.class.getName());
-                setGroupingComparator(conf, PigGroupingPartitionWritableComparator.class.getName());
+            if (tezOp.isSkewedJoin()) {
+                conf.setClass(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+                        getRawComparatorForSkewedJoin(keyType), RawComparator.class);
             } else {
                 conf.setClass(
                         TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,

Modified: pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java?rev=1695398&r1=1695397&r2=1695398&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java Tue Aug 11 23:03:16 2015
@@ -70,9 +70,9 @@ public class NullablePartitionWritable e
 
 	@Override
     public void readFields(DataInput in) throws IOException {
-		String c = in.readUTF();
+		byte type = in.readByte();
 		try {
-			key = HDataType.getWritableComparable(c);
+			key = HDataType.getNewWritableComparable(type);
 		} catch(Exception e) {
 			throw new IOException(e);
 		}
@@ -81,7 +81,7 @@ public class NullablePartitionWritable e
 
 	@Override
     public void write(DataOutput out) throws IOException {
-		out.writeUTF(key.getClass().getName());
+		out.writeByte(HDataType.findTypeFromClassName(key.getClass().getName()));
 		key.write(out);
 	}