You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/08/12 01:03:16 UTC
svn commit: r1695398 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/impl/io/
Author: rohini
Date: Tue Aug 11 23:03:16 2015
New Revision: 1695398
URL: http://svn.apache.org/r1695398
Log:
PIG-4651: Optimize NullablePartitionWritable serialization for skewed join (rohini)
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1695398&r1=1695397&r2=1695398&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Aug 11 23:03:16 2015
@@ -38,6 +38,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4651: Optimize NullablePartitionWritable serialization for skewed join (rohini)
+
PIG-4627: [Pig on Tez] Self join does not handle null values correctly (rohini)
PIG-4644: PORelationToExprProject.clone() is broken (erwaman via rohini)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java?rev=1695398&r1=1695397&r2=1695398&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java Tue Aug 11 23:03:16 2015
@@ -86,6 +86,48 @@ public class HDataType {
}
}
+ public static PigNullableWritable getNewWritableComparable(byte keyType) throws ExecException {
+ switch (keyType) {
+ case DataType.BAG:
+ return new NullableBag();
+ case DataType.BOOLEAN:
+ return new NullableBooleanWritable();
+ case DataType.BYTEARRAY:
+ return new NullableBytesWritable();
+ case DataType.CHARARRAY:
+ return new NullableText();
+ case DataType.DOUBLE:
+ return new NullableDoubleWritable();
+ case DataType.FLOAT:
+ return new NullableFloatWritable();
+ case DataType.INTEGER:
+ return new NullableIntWritable();
+ case DataType.BIGINTEGER:
+ return new NullableBigIntegerWritable();
+ case DataType.BIGDECIMAL:
+ return new NullableBigDecimalWritable();
+ case DataType.LONG:
+ return new NullableLongWritable();
+ case DataType.DATETIME:
+ return new NullableDateTimeWritable();
+ case DataType.TUPLE:
+ return new NullableTuple();
+ case DataType.MAP: {
+ int errCode = 1068;
+ String msg = "Using Map as key not supported.";
+ throw new ExecException(msg, errCode, PigException.INPUT);
+ }
+ default: {
+ if (typeToName == null) typeToName = DataType.genTypeToNameMap();
+ int errCode = 2044;
+ String msg = "The type "
+ + typeToName.get(keyType) == null ? "" + keyType : typeToName.get(keyType)
+ + " cannot be collected as a Key type";
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+ }
+ }
+
public static PigNullableWritable getWritableComparableTypes(Object o, byte keyType) throws ExecException{
byte newKeyType = keyType;
@@ -261,6 +303,14 @@ public class HDataType {
return wcKey;
}
+ public static byte findTypeFromClassName(String className) throws ExecException {
+ if (classToTypeMap.containsKey(className)) {
+ return classToTypeMap.get(className);
+ } else {
+ throw new ExecException("Unable to map " + className + " to known types." + Arrays.toString(classToTypeMap.keySet().toArray()));
+ }
+ }
+
public static byte findTypeFromNullableWritable(PigNullableWritable o) throws ExecException {
if (o instanceof NullableBooleanWritable)
return DataType.BOOLEAN;
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java?rev=1695398&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java Tue Aug 11 23:03:16 2015
@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import org.apache.pig.impl.io.NullablePartitionWritable;
+
+public class PigWritableComparators {
+
+ //
+ // Raw Comparators for Skewed Join
+ //
+ public static class PigBooleanRawPartitionComparator extends PigBooleanRawComparator {
+
+ public PigBooleanRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ // Skip the first byte which is the type of the key
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigIntRawPartitionComparator extends PigIntRawComparator {
+
+ public PigIntRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigBigIntegerRawPartitionComparator extends PigBigIntegerRawComparator {
+
+ public PigBigIntegerRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigBigDecimalRawPartitionComparator extends PigBigDecimalRawComparator {
+
+ public PigBigDecimalRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigLongRawPartitionComparator extends PigLongRawComparator {
+
+ public PigLongRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigFloatRawPartitionComparator extends PigFloatRawComparator {
+
+ public PigFloatRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigDoubleRawPartitionComparator extends PigDoubleRawComparator {
+
+ public PigDoubleRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigDateTimeRawPartitionComparator extends PigDateTimeRawComparator {
+
+ public PigDateTimeRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigTextRawPartitionComparator extends PigTextRawComparator {
+
+ public PigTextRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigBytesRawPartitionComparator extends PigBytesRawComparator {
+
+ public PigBytesRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigTupleSortPartitionComparator extends PigTupleSortComparator {
+
+ public PigTupleSortPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import org.apache.pig.impl.io.NullablePartitionWritable;
+
+public class PigWritableComparators {
+
+ //
+ // Raw Comparators for Skewed Join
+ //
+ public static class PigBooleanRawPartitionComparator extends PigBooleanRawComparator {
+
+ public PigBooleanRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ // Skip the first byte which is the type of the key
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigIntRawPartitionComparator extends PigIntRawComparator {
+
+ public PigIntRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigBigIntegerRawPartitionComparator extends PigBigIntegerRawComparator {
+
+ public PigBigIntegerRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigBigDecimalRawPartitionComparator extends PigBigDecimalRawComparator {
+
+ public PigBigDecimalRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigLongRawPartitionComparator extends PigLongRawComparator {
+
+ public PigLongRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigFloatRawPartitionComparator extends PigFloatRawComparator {
+
+ public PigFloatRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigDoubleRawPartitionComparator extends PigDoubleRawComparator {
+
+ public PigDoubleRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigDateTimeRawPartitionComparator extends PigDateTimeRawComparator {
+
+ public PigDateTimeRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigTextRawPartitionComparator extends PigTextRawComparator {
+
+ public PigTextRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigBytesRawPartitionComparator extends PigBytesRawComparator {
+
+ public PigBytesRawPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigTupleSortPartitionComparator extends PigTupleSortComparator {
+
+ public PigTupleSortPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1695398&r1=1695397&r2=1695398&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue Aug 11 23:03:16 2015
@@ -58,7 +58,6 @@ import org.apache.pig.backend.hadoop.dat
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingPartitionWritableComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
@@ -77,6 +76,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSecondaryKeyComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextRawComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleSortComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -1035,21 +1035,76 @@ public class TezDagBuilder extends TezOp
}
}
+ private static Class<? extends WritableComparator> getRawComparatorForSkewedJoin(byte keyType)
+ throws JobCreationException {
+
+ // Extended Raw Comparators for SkewedJoin which unwrap the NullablePartitionWritable
+ switch (keyType) {
+ case DataType.BOOLEAN:
+ return PigWritableComparators.PigBooleanRawPartitionComparator.class;
+
+ case DataType.INTEGER:
+ return PigWritableComparators.PigIntRawPartitionComparator.class;
+
+ case DataType.BIGINTEGER:
+ return PigWritableComparators.PigBigIntegerRawPartitionComparator.class;
+
+ case DataType.BIGDECIMAL:
+ return PigWritableComparators.PigBigDecimalRawPartitionComparator.class;
+
+ case DataType.LONG:
+ return PigWritableComparators.PigLongRawPartitionComparator.class;
+
+ case DataType.FLOAT:
+ return PigWritableComparators.PigFloatRawPartitionComparator.class;
+
+ case DataType.DOUBLE:
+ return PigWritableComparators.PigDoubleRawPartitionComparator.class;
+
+ case DataType.DATETIME:
+ return PigWritableComparators.PigDateTimeRawPartitionComparator.class;
+
+ case DataType.CHARARRAY:
+ return PigWritableComparators.PigTextRawPartitionComparator.class;
+
+ case DataType.BYTEARRAY:
+ return PigWritableComparators.PigBytesRawPartitionComparator.class;
+
+ case DataType.MAP:
+ int errCode = 1068;
+ String msg = "Using Map as key not supported.";
+ throw new JobCreationException(msg, errCode, PigException.INPUT);
+
+ case DataType.TUPLE:
+ return PigWritableComparators.PigTupleSortPartitionComparator.class;
+
+ case DataType.BAG:
+ errCode = 1068;
+ msg = "Using Bag as key not supported.";
+ throw new JobCreationException(msg, errCode, PigException.INPUT);
+
+ default:
+ errCode = 2036;
+ msg = "Unhandled key type " + DataType.findTypeName(keyType);
+ throw new JobCreationException(msg, errCode, PigException.BUG);
+ }
+ }
+
void selectOutputComparator(byte keyType, Configuration conf, TezOperator tezOp)
throws JobCreationException {
// TODO: Handle sorting like in JobControlCompiler
// TODO: Group comparators as in JobControlCompiler
- if (tezOp != null && tezOp.isUseSecondaryKey()) {
+ if (tezOp == null) {
+ return;
+ }
+ if (tezOp.isUseSecondaryKey()) {
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
PigSecondaryKeyComparator.class.getName());
setGroupingComparator(conf, PigSecondaryKeyGroupComparator.class.getName());
} else {
- if (tezOp != null && tezOp.isSkewedJoin()) {
- // TODO: PigGroupingPartitionWritableComparator only used as Group comparator in MR.
- // What should be TEZ_RUNTIME_KEY_COMPARATOR_CLASS if same as MR?
- conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
- PigGroupingPartitionWritableComparator.class.getName());
- setGroupingComparator(conf, PigGroupingPartitionWritableComparator.class.getName());
+ if (tezOp.isSkewedJoin()) {
+ conf.setClass(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+ getRawComparatorForSkewedJoin(keyType), RawComparator.class);
} else {
conf.setClass(
TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
Modified: pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java?rev=1695398&r1=1695397&r2=1695398&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java Tue Aug 11 23:03:16 2015
@@ -70,9 +70,9 @@ public class NullablePartitionWritable e
@Override
public void readFields(DataInput in) throws IOException {
- String c = in.readUTF();
+ byte type = in.readByte();
try {
- key = HDataType.getWritableComparable(c);
+ key = HDataType.getNewWritableComparable(type);
} catch(Exception e) {
throw new IOException(e);
}
@@ -81,7 +81,7 @@ public class NullablePartitionWritable e
@Override
public void write(DataOutput out) throws IOException {
- out.writeUTF(key.getClass().getName());
+ out.writeByte(HDataType.findTypeFromClassName(key.getClass().getName()));
key.write(out);
}