You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/08/18 21:56:27 UTC
svn commit: r1696491 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/
src/org/apache/pig/backend/had...
Author: rohini
Date: Tue Aug 18 19:56:26 2015
New Revision: 1696491
URL: http://svn.apache.org/r1696491
Log:
PIG-4657: [Pig on Tez] Optimize GroupBy and Distinct key comparison (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
pig/trunk/test/e2e/pig/tests/nightly.conf
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1696491&r1=1696490&r2=1696491&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Aug 18 19:56:26 2015
@@ -462,6 +462,9 @@ PIG-3939: SPRINTF function to format str
PIG-3970: Merge Tez branch into trunk (daijy)
OPTIMIZATIONS
+
+PIG-4657: [Pig on Tez] Optimize GroupBy and Distinct key comparison (rohini)
+
BUG FIXES
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java?rev=1696491&r1=1696490&r2=1696491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java Tue Aug 18 19:56:26 2015
@@ -17,12 +17,341 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+import org.apache.hadoop.io.WritableComparator;
import org.apache.pig.impl.io.NullablePartitionWritable;
public class PigWritableComparators {
+ // Byte only raw comparators for faster comparison for non-orderby jobs. Not re-using
+ // JobControlCompiler.Pig<DataType>WritableComparator which extend PigWritableComparator.
+ // Those use PigNullablePartitionWritable.compareTo which is not that efficient in cases like
+ // tuple where tuple is iterated for null checking instead of taking advantage of
+ // TupleRawComparator.hasComparedTupleNull(). Also skips multi-query index checking
+
+ public static class PigBooleanRawBytesComparator extends PigBooleanRawComparator {
+
+ public PigBooleanRawBytesComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+ }
+
+ public static class PigIntRawBytesComparator extends PigIntRawComparator {
+
+ public PigIntRawBytesComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+ }
+
+ public static class PigBigIntegerRawBytesComparator extends PigBigIntegerRawComparator {
+
+ public PigBigIntegerRawBytesComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+ }
+
+ public static class PigBigDecimalRawBytesComparator extends PigBigDecimalRawComparator {
+
+ public PigBigDecimalRawBytesComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+ }
+
+ public static class PigLongRawBytesComparator extends PigLongRawComparator {
+
+ public PigLongRawBytesComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+ }
+
+ public static class PigFloatRawBytesComparator extends PigFloatRawComparator {
+
+ public PigFloatRawBytesComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+ }
+
+ public static class PigDoubleRawBytesComparator extends PigDoubleRawComparator {
+
+ public PigDoubleRawBytesComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+ }
+
+ public static class PigDateTimeRawBytesComparator extends PigDateTimeRawComparator {
+
+ public PigDateTimeRawBytesComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+ }
+
+ public static class PigTextRawBytesComparator extends PigTextRawComparator {
+
+ public PigTextRawBytesComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+ }
+
+ public static class PigBytesRawBytesComparator extends PigBytesRawComparator {
+
+ public PigBytesRawBytesComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+ }
+
+ public static class PigTupleSortBytesComparator extends PigTupleSortComparator {
+
+ public PigTupleSortBytesComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+ }
+
+ //
+ // Byte only raw comparators for faster comparison for Skewed Join.
+ //
+ public static class PigBooleanRawBytesPartitionComparator extends PigBooleanRawComparator {
+
+ public PigBooleanRawBytesPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigIntRawBytesPartitionComparator extends PigIntRawComparator {
+
+ public PigIntRawBytesPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigBigIntegerRawBytesPartitionComparator extends PigBigIntegerRawComparator {
+
+ public PigBigIntegerRawBytesPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigBigDecimalRawBytesPartitionComparator extends PigBigDecimalRawComparator {
+
+ public PigBigDecimalRawBytesPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigLongRawBytesPartitionComparator extends PigLongRawComparator {
+
+ public PigLongRawBytesPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigFloatRawBytesPartitionComparator extends PigFloatRawComparator {
+
+ public PigFloatRawBytesPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigDoubleRawBytesPartitionComparator extends PigDoubleRawComparator {
+
+ public PigDoubleRawBytesPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigDateTimeRawBytesPartitionComparator extends PigDateTimeRawComparator {
+
+ public PigDateTimeRawBytesPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigTextRawBytesPartitionComparator extends PigTextRawComparator {
+
+ public PigTextRawBytesPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigBytesRawBytesPartitionComparator extends PigBytesRawComparator {
+
+ public PigBytesRawBytesPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
+ public static class PigTupleSortBytesPartitionComparator extends PigTupleSortComparator {
+
+ public PigTupleSortBytesPartitionComparator() {
+ super();
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return super.compare(((NullablePartitionWritable)o1).getKey(), ((NullablePartitionWritable)o2).getKey());
+ }
+ }
+
//
- // Raw Comparators for Skewed Join
+ // Raw Comparators for Skewed Join
//
public static class PigBooleanRawPartitionComparator extends PigBooleanRawComparator {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1696491&r1=1696490&r2=1696491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue Aug 18 19:56:26 2015
@@ -283,7 +283,7 @@ public class TezDagBuilder extends TezOp
if (tezOp.isVertexGroup()) {
groupMembers[i] = from;
} else {
- EdgeProperty prop = newEdge(pred, tezOp);
+ EdgeProperty prop = newEdge(pred, tezOp, false);
Edge edge = Edge.create(from, to, prop);
dag.addEdge(edge);
}
@@ -311,7 +311,7 @@ public class TezDagBuilder extends TezOp
private GroupInputEdge newGroupInputEdge(TezOperator fromOp,
TezOperator toOp, VertexGroup from, Vertex to) throws IOException {
- EdgeProperty edgeProperty = newEdge(fromOp, toOp);
+ EdgeProperty edgeProperty = newEdge(fromOp, toOp, true);
String groupInputClass = ConcatenatedMergedKeyValueInput.class.getName();
@@ -334,7 +334,7 @@ public class TezDagBuilder extends TezOp
* @return EdgeProperty
* @throws IOException
*/
- private EdgeProperty newEdge(TezOperator from, TezOperator to)
+ private EdgeProperty newEdge(TezOperator from, TezOperator to, boolean isMergedInput)
throws IOException {
TezEdgeDescriptor edge = to.inEdges.get(from.getOperatorKey());
PhysicalPlan combinePlan = edge.combinePlan;
@@ -345,7 +345,7 @@ public class TezDagBuilder extends TezOp
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
UDFContext.getUDFContext().serialize(conf);
if (!combinePlan.isEmpty()) {
- addCombiner(combinePlan, to, conf);
+ addCombiner(combinePlan, to, conf, isMergedInput);
}
List<POLocalRearrangeTez> lrs = PlanHelper.getPhysicalOperators(from.plan,
@@ -354,7 +354,7 @@ public class TezDagBuilder extends TezOp
for (POLocalRearrangeTez lr : lrs) {
if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
byte keyType = lr.getKeyType();
- setIntermediateOutputKeyValue(keyType, conf, to, lr.isConnectedToPackage());
+ setIntermediateOutputKeyValue(keyType, conf, to, lr.isConnectedToPackage(), isMergedInput);
// In case of secondary key sort, main key type is the actual key type
conf.set("pig.reduce.key.type", Byte.toString(lr.getMainKeyType()));
break;
@@ -435,11 +435,11 @@ public class TezDagBuilder extends TezOp
}
private void addCombiner(PhysicalPlan combinePlan, TezOperator pkgTezOp,
- Configuration conf) throws IOException {
+ Configuration conf, boolean isMergedInput) throws IOException {
POPackage combPack = (POPackage) combinePlan.getRoots().get(0);
POLocalRearrange combRearrange = (POLocalRearrange) combinePlan
.getLeaves().get(0);
- setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf, pkgTezOp);
+ setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf, pkgTezOp, true, isMergedInput);
LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
combinePlan, null, pkgTezOp, combPack);
@@ -538,13 +538,14 @@ public class TezDagBuilder extends TezOp
byte keyType = pack.getPkgr().getKeyType();
tezOp.plan.remove(pack);
payloadConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
- setIntermediateOutputKeyValue(keyType, payloadConf, tezOp);
+
POShuffleTezLoad newPack = new POShuffleTezLoad(pack);
if (tezOp.isSkewedJoin()) {
newPack.setSkewedJoins(true);
}
tezOp.plan.add(newPack);
+ boolean isMergedInput = false;
// Set input keys for POShuffleTezLoad. This is used to identify
// the inputs that are attached to the POShuffleTezLoad in the
// backend.
@@ -554,7 +555,9 @@ public class TezDagBuilder extends TezOp
// skip sample vertex input
} else {
String inputKey = pred.getOperatorKey().toString();
+ boolean isVertexGroup = false;
if (pred.isVertexGroup()) {
+ isVertexGroup = true;
pred = mPlan.getOperator(pred.getVertexGroupMembers().get(0));
}
LinkedList<POLocalRearrangeTez> lrs =
@@ -563,6 +566,9 @@ public class TezDagBuilder extends TezOp
if (lr.isConnectedToPackage()
&& lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
localRearrangeMap.put((int) lr.getIndex(), inputKey);
+ if (isVertexGroup) {
+ isMergedInput = true;
+ }
}
}
}
@@ -577,7 +583,8 @@ public class TezDagBuilder extends TezOp
}
}
- setIntermediateOutputKeyValue(pack.getPkgr().getKeyType(), payloadConf, tezOp);
+ //POShuffleTezLoad accesses the comparator setting
+ selectKeyComparator(keyType, payloadConf, tezOp, isMergedInput);
} else if (roots.size() == 1 && roots.get(0) instanceof POIdentityInOutTez) {
POIdentityInOutTez identityInOut = (POIdentityInOutTez) roots.get(0);
// TODO Need to fix multiple input key mapping
@@ -953,14 +960,9 @@ public class TezDagBuilder extends TezOp
return stores;
}
- private void setIntermediateOutputKeyValue(byte keyType, Configuration conf, TezOperator tezOp)
- throws JobCreationException, ExecException {
- setIntermediateOutputKeyValue(keyType, conf, tezOp, true);
- }
-
@SuppressWarnings("rawtypes")
private void setIntermediateOutputKeyValue(byte keyType, Configuration conf, TezOperator tezOp,
- boolean isConnectedToPackage) throws JobCreationException, ExecException {
+ boolean isConnectedToPackage, boolean isMergedInput) throws JobCreationException, ExecException {
if (tezOp != null && tezOp.isUseSecondaryKey() && isConnectedToPackage) {
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS,
NullableTuple.class.getName());
@@ -978,12 +980,86 @@ public class TezDagBuilder extends TezOp
NullableTuple.class.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
MRPartitioner.class.getName());
- selectOutputComparator(keyType, conf, tezOp);
+ selectKeyComparator(keyType, conf, tezOp, isMergedInput);
+ }
+
+ private static Class<? extends WritableComparator> getRawBytesComparator(
+ byte keyType) throws JobCreationException {
+
+ // These comparators only compare bytes and we will use them except for
+ // order by for faster sorting.
+ // This ordering is good enough to be fed to reducer (POShuffleTezLoad)
+ // which will use the full comparator (GroupingComparator) for correct
+ // sorting and grouping.
+ // TODO: PIG-4652. Till Tez exposes a way to get bytes of keys being compared,
+ // we can use this only for groupby and distinct which are single inputs in
+ // POShuffleTezLoad and not join which has multiple inputs.
+
+ switch (keyType) {
+ case DataType.BOOLEAN:
+ return PigWritableComparators.PigBooleanRawBytesComparator.class;
+
+ case DataType.INTEGER:
+ return PigWritableComparators.PigIntRawBytesComparator.class;
+
+ case DataType.BIGINTEGER:
+ return PigWritableComparators.PigBigIntegerRawBytesComparator.class;
+
+ case DataType.BIGDECIMAL:
+ return PigWritableComparators.PigBigDecimalRawBytesComparator.class;
+
+ case DataType.LONG:
+ return PigWritableComparators.PigLongRawBytesComparator.class;
+
+ case DataType.FLOAT:
+ return PigWritableComparators.PigFloatRawBytesComparator.class;
+
+ case DataType.DOUBLE:
+ return PigWritableComparators.PigDoubleRawBytesComparator.class;
+
+ case DataType.DATETIME:
+ return PigWritableComparators.PigDateTimeRawBytesComparator.class;
+
+ case DataType.CHARARRAY:
+ return PigWritableComparators.PigTextRawBytesComparator.class;
+
+ case DataType.BYTEARRAY:
+ return PigWritableComparators.PigBytesRawBytesComparator.class;
+
+ case DataType.MAP:
+ int errCode = 1068;
+ String msg = "Using Map as key not supported.";
+ throw new JobCreationException(msg, errCode, PigException.INPUT);
+
+ case DataType.TUPLE:
+ return PigWritableComparators.PigTupleSortBytesComparator.class;
+
+ case DataType.BAG:
+ errCode = 1068;
+ msg = "Using Bag as key not supported.";
+ throw new JobCreationException(msg, errCode, PigException.INPUT);
+
+ default:
+ errCode = 2036;
+ msg = "Unhandled key type " + DataType.findTypeName(keyType);
+ throw new JobCreationException(msg, errCode, PigException.BUG);
+ }
}
private static Class<? extends WritableComparator> getRawComparator(byte keyType)
throws JobCreationException {
+ // These are full comparators used in order by jobs and as GroupingComparator in
+ // POShuffleTezLoad for other operations.
+
+ // Mapreduce uses PigGrouping<DataType>WritableComparator for non-orderby jobs.
+ // In Tez, we will use the raw comparators itself on the reduce side as well as it is
+ // now fixed to handle nulls for different indexes.
+ // Also PigGrouping<DataType>WritableComparator use PigNullablePartitionWritable.compareTo
+ // which is not that efficient for cases like tuple where tuple is iterated for null checking
+ // instead of taking advantage of TupleRawComparator.hasComparedTupleNull().
+ // Also skips multi-query index checking
+
switch (keyType) {
case DataType.BOOLEAN:
return PigBooleanRawComparator.class;
@@ -1035,6 +1111,61 @@ public class TezDagBuilder extends TezOp
}
}
+ private static Class<? extends WritableComparator> getRawBytesComparatorForSkewedJoin(byte keyType)
+ throws JobCreationException {
+
+ // Extended Raw Bytes Comparators for SkewedJoin which unwrap the NullablePartitionWritable
+ switch (keyType) {
+ case DataType.BOOLEAN:
+ return PigWritableComparators.PigBooleanRawBytesPartitionComparator.class;
+
+ case DataType.INTEGER:
+ return PigWritableComparators.PigIntRawBytesPartitionComparator.class;
+
+ case DataType.BIGINTEGER:
+ return PigWritableComparators.PigBigIntegerRawBytesPartitionComparator.class;
+
+ case DataType.BIGDECIMAL:
+ return PigWritableComparators.PigBigDecimalRawBytesPartitionComparator.class;
+
+ case DataType.LONG:
+ return PigWritableComparators.PigLongRawBytesPartitionComparator.class;
+
+ case DataType.FLOAT:
+ return PigWritableComparators.PigFloatRawBytesPartitionComparator.class;
+
+ case DataType.DOUBLE:
+ return PigWritableComparators.PigDoubleRawBytesPartitionComparator.class;
+
+ case DataType.DATETIME:
+ return PigWritableComparators.PigDateTimeRawBytesPartitionComparator.class;
+
+ case DataType.CHARARRAY:
+ return PigWritableComparators.PigTextRawBytesPartitionComparator.class;
+
+ case DataType.BYTEARRAY:
+ return PigWritableComparators.PigBytesRawBytesPartitionComparator.class;
+
+ case DataType.MAP:
+ int errCode = 1068;
+ String msg = "Using Map as key not supported.";
+ throw new JobCreationException(msg, errCode, PigException.INPUT);
+
+ case DataType.TUPLE:
+ return PigWritableComparators.PigTupleSortBytesPartitionComparator.class;
+
+ case DataType.BAG:
+ errCode = 1068;
+ msg = "Using Bag as key not supported.";
+ throw new JobCreationException(msg, errCode, PigException.INPUT);
+
+ default:
+ errCode = 2036;
+ msg = "Unhandled key type " + DataType.findTypeName(keyType);
+ throw new JobCreationException(msg, errCode, PigException.BUG);
+ }
+ }
+
private static Class<? extends WritableComparator> getRawComparatorForSkewedJoin(byte keyType)
throws JobCreationException {
@@ -1090,7 +1221,7 @@ public class TezDagBuilder extends TezOp
}
}
- void selectOutputComparator(byte keyType, Configuration conf, TezOperator tezOp)
+ void selectKeyComparator(byte keyType, Configuration conf, TezOperator tezOp, boolean isMergedInput)
throws JobCreationException {
// TODO: Handle sorting like in JobControlCompiler
// TODO: Group comparators as in JobControlCompiler
@@ -1102,7 +1233,13 @@ public class TezDagBuilder extends TezOp
PigSecondaryKeyComparator.class.getName());
setGroupingComparator(conf, PigSecondaryKeyGroupComparator.class.getName());
} else {
- if (tezOp.isSkewedJoin()) {
+ // If it is not a merged input (OrderedGroupedMergedKVInput) from union then
+ // use bytes only comparator. This is temporary till PIG-4652 is done
+ if (!isMergedInput && (tezOp.isGroupBy() || tezOp.isDistinct())) {
+ conf.setClass(
+ TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+ getRawBytesComparator(keyType), RawComparator.class);
+ } else if (tezOp.isSkewedJoin()) {
conf.setClass(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
getRawComparatorForSkewedJoin(keyType), RawComparator.class);
} else {
@@ -1110,9 +1247,58 @@ public class TezDagBuilder extends TezOp
TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
getRawComparator(keyType), RawComparator.class);
}
+
+ // Comparators now
+ // groupby/distinct : Comparator - RawBytesComparator
+ // groupby/distinct after union : Comparator - RawComparator
+ // orderby : Comparator - RawComparator
+ // skewed join : Comparator - RawPartitionComparator
+ // Rest (other joins) : Comparator - RawComparator
+
+ //TODO: In PIG-4652: After Tez support for exposing key bytes
+ // groupby/distinct : Comparator - RawBytesComparator. No grouping comparator required.
+ // orderby : Comparator - RawComparator. No grouping comparator required.
+ // skewed join : Comparator - RawBytesPartitionComparator, GroupingComparator - RawPartitionComparator
+ // Rest (other joins) : Comparator - RawBytesComparator, GroupingComparator - RawComparator
+
+ /*
+ if (tezOp.isSkewedJoin()) {
+ conf.setClass(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+ getRawBytesComparatorForSkewedJoin(keyType), RawComparator.class);
+ setGroupingComparator(conf, getRawComparatorForSkewedJoin(keyType).getName());
+ } else if (tezOp.isGroupBy() || tezOp.isDistinct()) {
+ conf.setClass(
+ TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+ getRawBytesComparator(keyType), RawComparator.class);
+ } else if (hasOrderby(tezOp)) {
+ conf.setClass(
+ TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+ getRawComparator(keyType), RawComparator.class);
+ } else {
+ conf.setClass(
+ TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+ getRawBytesComparator(keyType), RawComparator.class);
+ setGroupingComparator(conf, getRawComparator(keyType).getName());
+ }
+ */
}
}
+ private boolean hasOrderby(TezOperator tezOp) {
+ boolean hasOrderBy = tezOp.isGlobalSort() || tezOp.isLimitAfterSort();
+ if (!hasOrderBy) {
+ // Check if it is a Orderby sampler job
+ List<TezOperator> succs = getPlan().getSuccessors(tezOp);
+ if (succs != null && succs.size() == 1) {
+ if (succs.get(0).isGlobalSort()) {
+ hasOrderBy = true;
+ }
+ }
+ }
+ return hasOrderBy;
+ }
+
+
private void setGroupingComparator(Configuration conf, String comparatorClass) {
// In MR - job.setGroupingComparatorClass() or MRJobConfig.GROUP_COMPARATOR_CLASS
// TODO: Check why tez-mapreduce ReduceProcessor use two different tez
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1696491&r1=1696490&r2=1696491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Tue Aug 18 19:56:26 2015
@@ -688,6 +688,7 @@ public class TezCompiler extends PhyPlan
clr.setDistinct(true);
combinePlan.addAsLeaf(clr);
+ curTezOp.markDistinct();
addDistinctPlan(curTezOp.plan, op.getRequestedParallelism());
curTezOp.setRequestedParallelism(op.getRequestedParallelism());
phyToTezOpMap.put(op, curTezOp);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1696491&r1=1696490&r2=1696491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Tue Aug 18 19:56:26 2015
@@ -173,6 +173,8 @@ public class TezOperator extends Operato
LIMIT_AFTER_SORT,
// Indicate if this job is a union job
UNION,
+ // Indicate if this job is a distinct job
+ DISTINCT,
// Indicate if this job is a native job
NATIVE;
};
@@ -420,6 +422,14 @@ public class TezOperator extends Operato
feature.set(OPER_FEATURE.UNION.ordinal());
}
+ public boolean isDistinct() {
+ return feature.get(OPER_FEATURE.DISTINCT.ordinal());
+ }
+
+ public void markDistinct() {
+ feature.set(OPER_FEATURE.DISTINCT.ordinal());
+ }
+
public boolean isNative() {
return feature.get(OPER_FEATURE.NATIVE.ordinal());
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1696491&r1=1696490&r2=1696491&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java Tue Aug 18 19:56:26 2015
@@ -58,6 +58,7 @@ public class POShuffleTezLoad extends PO
private transient boolean[] finished;
private transient boolean[] readOnce;
private transient WritableComparator comparator = null;
+ private transient WritableComparator groupingComparator = null;
private transient Configuration conf;
private transient int accumulativeBatchSize;
@@ -87,7 +88,8 @@ public class POShuffleTezLoad extends PO
this.conf = conf;
this.inputs = new ArrayList<LogicalInput>();
this.readers = new ArrayList<KeyValuesReader>();
- this.comparator = (WritableComparator) ConfigUtils.getInputKeySecondaryGroupingComparator(conf);
+ this.comparator = (WritableComparator) ConfigUtils.getIntermediateInputKeyComparator(conf);
+ this.groupingComparator = (WritableComparator) ConfigUtils.getInputKeySecondaryGroupingComparator(conf);
this.accumulativeBatchSize = AccumulatorOptimizerUtil.getAccumulativeBatchSize();
try {
@@ -135,13 +137,25 @@ public class POShuffleTezLoad extends PO
PigNullableWritable min = null;
try {
- for (int i = 0; i < numTezInputs; i++) {
- if (!finished[i]) {
+ if (numTezInputs == 1) {
+ if (!finished[0]) {
hasData = true;
- cur = readers.get(i).getCurrentKey();
- if (min == null || comparator.compare(min, cur) > 0) {
- //Not a deep clone. Writable is referenced.
- min = ((PigNullableWritable)cur).clone();
+ cur = readers.get(0).getCurrentKey();
+ // Just move to the next key without comparison
+ min = ((PigNullableWritable)cur).clone();
+ }
+ } else {
+ for (int i = 0; i < numTezInputs; i++) {
+ if (!finished[i]) {
+ hasData = true;
+ cur = readers.get(i).getCurrentKey();
+ // TODO: PIG-4652 should compare key bytes instead
+ // of deserialized objects when using BytesComparator
+ // for faster comparison
+ if (min == null || comparator.compare(min, cur) > 0) {
+ //Not a deep clone. Writable is referenced.
+ min = ((PigNullableWritable)cur).clone();
+ }
}
}
}
@@ -177,24 +191,40 @@ public class POShuffleTezLoad extends PO
bags[i] = new InternalCachedBag(numInputs);
}
- for (int i = 0; i < numTezInputs; i++) {
-
- if (!finished[i]) {
- cur = readers.get(i).getCurrentKey();
- // We need to loop in case of Grouping Comparators
- while (comparator.compare(min, cur) == 0) {
- Iterable<Object> vals = readers.get(i).getCurrentValues();
- for (Object val : vals) {
- NullableTuple nTup = (NullableTuple) val;
- int index = nTup.getIndex();
- Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
- bags[index].add(tup);
- }
- finished[i] = !readers.get(i).next();
- if (finished[i]) {
- break;
- }
+ if (numTezInputs == 1) {
+ do {
+ Iterable<Object> vals = readers.get(0).getCurrentValues();
+ for (Object val : vals) {
+ NullableTuple nTup = (NullableTuple) val;
+ int index = nTup.getIndex();
+ Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+ bags[index].add(tup);
+ }
+ finished[0] = !readers.get(0).next();
+ if (finished[0]) {
+ break;
+ }
+ cur = readers.get(0).getCurrentKey();
+ } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators
+ } else {
+ for (int i = 0; i < numTezInputs; i++) {
+ if (!finished[i]) {
cur = readers.get(i).getCurrentKey();
+ // We need to loop in case of Grouping Comparators
+ while (groupingComparator.compare(min, cur) == 0) {
+ Iterable<Object> vals = readers.get(i).getCurrentValues();
+ for (Object val : vals) {
+ NullableTuple nTup = (NullableTuple) val;
+ int index = nTup.getIndex();
+ Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+ bags[index].add(tup);
+ }
+ finished[i] = !readers.get(i).next();
+ if (finished[i]) {
+ break;
+ }
+ cur = readers.get(i).getCurrentKey();
+ }
}
}
}
@@ -264,7 +294,7 @@ public class POShuffleTezLoad extends PO
for (int i = 0; i < numTezInputs; i++) {
if (!finished[i]) {
cur = readers.get(i).getCurrentKey();
- if (comparator.compare(min, cur) == 0) {
+ if (groupingComparator.compare(min, cur) == 0) {
return true;
}
}
@@ -287,7 +317,7 @@ public class POShuffleTezLoad extends PO
if (!finished[i]) {
cur = readers.get(i).getCurrentKey();
int batchCount = 0;
- while (comparator.compare(min, cur) == 0) {
+ while (groupingComparator.compare(min, cur) == 0) {
Iterator<Object> iter = readers.get(i).getCurrentValues().iterator();
while (iter.hasNext() && batchCount < batchSize) {
NullableTuple nTup = (NullableTuple) iter.next();
@@ -328,7 +358,7 @@ public class POShuffleTezLoad extends PO
for (int i = 0; i < numTezInputs; i++) {
if (!finished[i]) {
cur = readers.get(i).getCurrentKey();
- while (comparator.compare(min, cur) == 0) {
+ while (groupingComparator.compare(min, cur) == 0) {
finished[i] = !readers.get(i).next();
if (finished[i]) {
break;
Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1696491&r1=1696490&r2=1696491&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Tue Aug 18 19:56:26 2015
@@ -1552,6 +1552,15 @@ d = cross a, c;
e = union b, d;
store e into ':OUTPATH:';\,
},
+ {
+ # Union + Distinct
+ 'num' => 16,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = distinct c;
+store c into ':OUTPATH:';\,
+ }
]
},
{