You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/01/11 07:23:33 UTC
svn commit: r1723978 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/
src/org/apache/pig/d...
Author: rohini
Date: Mon Jan 11 06:23:33 2016
New Revision: 1723978
URL: http://svn.apache.org/viewvc?rev=1723978&view=rev
Log:
PIG-4773: [Pig on Tez] Secondary key descending sort in nested foreach after union does ascending instead (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
pig/trunk/src/org/apache/pig/data/BinInterSedes.java
pig/trunk/test/e2e/pig/tests/nightly.conf
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1723978&r1=1723977&r2=1723978&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jan 11 06:23:33 2016
@@ -79,6 +79,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4773: [Pig on Tez] Secondary key descending sort in nested foreach after union does ascending instead (rohini)
+
PIG-4774: Fix NPE in SUM,AVG,MIN,MAX UDFs for null bag input (rohini)
PIG-4757: Job stats on successfully read/output records wrong with multiple inputs/outputs (rohini)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java?rev=1723978&r1=1723977&r2=1723978&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java Mon Jan 11 06:23:33 2016
@@ -17,19 +17,18 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.data.TupleRawComparator;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
public class PigSecondaryKeyComparator extends WritableComparator implements Configurable {
- private final Log mLog = LogFactory.getLog(getClass());
+
private TupleRawComparator mComparator=null;
@Override
@@ -54,6 +53,7 @@ public class PigSecondaryKeyComparator e
return null;
}
+ @Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
// the last byte of a NullableTuple is its Index
@@ -84,6 +84,47 @@ public class PigSecondaryKeyComparator e
rc = -1;
else
rc = 1;
+ }
+ return rc;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compare(WritableComparable a, WritableComparable b)
+ {
+ PigNullableWritable wa = (PigNullableWritable)a;
+ PigNullableWritable wb = (PigNullableWritable)b;
+
+ if ((wa.getIndex() & PigNullableWritable.mqFlag) != 0) { // this is a multi-query index
+ if ((wa.getIndex() & PigNullableWritable.idxSpace) < (wb.getIndex() & PigNullableWritable.idxSpace))
+ return -1;
+ else if ((wa.getIndex() & PigNullableWritable.idxSpace) > (wb.getIndex() & PigNullableWritable.idxSpace))
+ return 1;
+ // If equal, we fall through
+ }
+
+ int rc = 0;
+ // If either are null, handle differently.
+ if (!wa.isNull() && !wb.isNull()) {
+ rc = mComparator.compare((Tuple) wa.getValueAsPigType(), (Tuple) wb.getValueAsPigType());
+ // handle PIG-927
+ // if tuples are equal but any field inside tuple is null, then we do not merge keys
+ if (rc == 0 && mComparator.hasComparedTupleNull())
+ rc = (wa.getIndex() & PigNullableWritable.idxSpace) - (wb.getIndex() & PigNullableWritable.idxSpace);
+ } else {
+ // Two nulls are equal if indices are same
+ if (wa.isNull() && wb.isNull()) {
+ if ((wa.getIndex() & PigNullableWritable.idxSpace) < (wb.getIndex() & PigNullableWritable.idxSpace))
+ rc = -1;
+ else if ((wa.getIndex() & PigNullableWritable.idxSpace) > (wb.getIndex() & PigNullableWritable.idxSpace))
+ rc = 1;
+ else
+ rc = 0;
+ }
+ else if (wa.isNull())
+ rc = -1;
+ else
+ rc = 1;
}
return rc;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1723978&r1=1723977&r2=1723978&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Mon Jan 11 06:23:33 2016
@@ -584,6 +584,7 @@ public class TezDagBuilder extends TezOp
// the inputs that are attached to the POShuffleTezLoad in the
// backend.
Map<Integer, String> localRearrangeMap = new TreeMap<Integer, String>();
+ TezOperator from = null;
for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
if (tezOp.getSampleOperator() != null && tezOp.getSampleOperator() == pred) {
// skip sample vertex input
@@ -603,6 +604,7 @@ public class TezDagBuilder extends TezOp
if (isVertexGroup) {
isMergedInput = true;
}
+ from = pred;
}
}
}
@@ -619,6 +621,16 @@ public class TezDagBuilder extends TezOp
//POShuffleTezLoad accesses the comparator setting
selectKeyComparator(keyType, payloadConf, tezOp, isMergedInput);
+
+ if (tezOp.isUseSecondaryKey()) {
+ TezEdgeDescriptor edge = tezOp.inEdges.get(from.getOperatorKey());
+ // Currently only PigSecondaryKeyGroupingComparator is used in POShuffleTezLoad.
+ // When PIG-4685: SecondaryKeyOptimizerTez does not optimize cogroup is fixed
+ // in future, PigSecondaryKeyComparator will have to be used and that will require this.
+ payloadConf.set("pig.secondarySortOrder", ObjectSerializer
+ .serialize(edge.getSecondarySortOrder()));
+ }
+
}
// set parent plan in all operators. currently the parent plan is really
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1723978&r1=1723977&r2=1723978&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java Mon Jan 11 06:23:33 2016
@@ -56,12 +56,13 @@ public class SecondaryKeyOptimizerTez ex
return;
}
+ // TODO: PIG-4685: SecondaryKeyOptimizerTez does not optimize cogroup
// Current code does not handle more than one predecessors
// even though it is possible. The problem is when we
// process the first predecessor, we remove the foreach inner
// operators from the reduce side, and the second predecessor
// cannot see them
- if (predecessors.size()>1) {
+ if (predecessors.size() > 1) {
return;
}
TezOperator from = predecessors.get(0);
Modified: pig/trunk/src/org/apache/pig/data/BinInterSedes.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/BinInterSedes.java?rev=1723978&r1=1723977&r2=1723978&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/BinInterSedes.java (original)
+++ pig/trunk/src/org/apache/pig/data/BinInterSedes.java Mon Jan 11 06:23:33 2016
@@ -1072,7 +1072,7 @@ public class BinInterSedes implements In
// we have a compound tuple key (main_key, secondary_key). Each key has its own sort order, so
// we have to deal with them separately. We delegate it to the first invocation of
// compareDatum()
- assert (tsz1 == 3); // main_key, secondary_key, value
+ assert (tsz1 == 2); // main_key, secondary_key
result = compareDatum(t1.get(0), t2.get(0), mAsc);
if (result == 0)
result = compareDatum(t1.get(1), t2.get(1), mSecondaryAsc);
Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1723978&r1=1723977&r2=1723978&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Mon Jan 11 06:23:33 2016
@@ -1424,8 +1424,11 @@ store e into ':OUTPATH:';\,
b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
c = union a, b;
d = group c by name;
-e = foreach d { f = order c by $1,$2; generate group, f; };
-store e into ':OUTPATH:';\,
+d1 = group c by name; -- Two separate groupbys to ensure secondary key partitioner
+e = foreach d { f = order c by age, gpa ; g = limit f 1; generate g; };
+h = foreach d1 { i = order c by age asc, gpa desc; j = limit i 1; generate j; };
+store e into ':OUTPATH:.1';
+store h into ':OUTPATH:.2';\,
},
{
# Union + Orderby