You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/02/17 09:02:44 UTC
svn commit: r1568895 - in /pig/branches/tez:
src/org/apache/pig/backend/hadoop/executionengine/tez/
test/org/apache/pig/test/data/GoldenFiles/ test/org/apache/pig/tez/
Author: rohini
Date: Mon Feb 17 08:02:44 2014
New Revision: 1568895
URL: http://svn.apache.org/r1568895
Log:
PIG-3766: Use ONE_TO_ONE edge and IdentityInOut in skewed join intermediate vertex (rohini)
Added:
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld
pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java?rev=1568895&r1=1568894&r2=1568895&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java Mon Feb 17 08:02:44 2014
@@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.impl.io.NullablePartitionWritable;
+import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
@@ -99,14 +101,31 @@ public class POIdentityInOutTez extends
while (shuffleReader.next()) {
Object curKey = shuffleReader.getCurrentKey();
Iterable<Object> vals = shuffleReader.getCurrentValues();
+ if (isSkewedJoin) {
+ NullablePartitionWritable wrappedKey = new NullablePartitionWritable(
+ (PigNullableWritable) curKey);
+ wrappedKey.setPartition(-1);
+ curKey = wrappedKey;
+ }
for (Object val : vals) {
writer.write(curKey, val);
}
}
} else {
while (reader.next()) {
- writer.write(reader.getCurrentKey(),
- reader.getCurrentValue());
+ if (isSkewedJoin) {
+ NullablePartitionWritable wrappedKey = new NullablePartitionWritable(
+ (PigNullableWritable) reader.getCurrentKey());
+ // Skewed join wraps key with NullablePartitionWritable
+ // The partitionIndex in NullablePartitionWritable is not serialized.
+ // So setting it here instead of the previous vertex POLocalRearrangeTez.
+ // Serializing it would add overhead for MR as well.
+ wrappedKey.setPartition(-1);
+ writer.write(wrappedKey, reader.getCurrentValue());
+ } else {
+ writer.write(reader.getCurrentKey(),
+ reader.getCurrentValue());
+ }
}
}
return RESULT_EOP;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java?rev=1568895&r1=1568894&r2=1568895&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java Mon Feb 17 08:02:44 2014
@@ -28,7 +28,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -50,8 +49,8 @@ public class POLocalRearrangeTez extends
protected transient KeyValueWriter writer;
// Tez union is implemented as LR + Pkg
- private boolean isUnion = false;
- private boolean isSkewedJoin = false;
+ protected boolean isUnion = false;
+ protected boolean isSkewedJoin = false;
public POLocalRearrangeTez(OperatorKey k) {
super(k);
@@ -63,6 +62,13 @@ public class POLocalRearrangeTez extends
public POLocalRearrangeTez(POLocalRearrange copy) {
super(copy);
+ if (copy instanceof POLocalRearrangeTez) {
+ POLocalRearrangeTez copyTez = (POLocalRearrangeTez) copy;
+ this.isUnion = copyTez.isUnion;
+ this.isSkewedJoin = copyTez.isSkewedJoin;
+ this.outputKey = copyTez.outputKey;
+ }
+
}
public String getOutputKey() {
@@ -140,12 +146,6 @@ public class POLocalRearrangeTez extends
val = new NullableTuple((Tuple)result.get(1));
} else {
key = HDataType.getWritableComparableTypes(result.get(1), keyType);
- if (isSkewedJoin) {
- // Skewed join wraps key with NullablePartitionWritable
- NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
- wrappedKey.setPartition(-1);
- key = wrappedKey;
- }
val = new NullableTuple((Tuple)result.get(2));
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1568895&r1=1568894&r2=1568895&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Mon Feb 17 08:02:44 2014
@@ -36,10 +36,9 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.data.BinSedesTuple;
import org.apache.pig.data.SchemaTupleBackend;
-import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.tez.common.TezUtils;
@@ -229,8 +228,7 @@ public class PigProcessor implements Log
Object val = reader.getCurrentValue();
if (val != null) {
// Sample is not empty
- NullableTuple nTup = (NullableTuple) val;
- Tuple t = (Tuple) nTup.getValueAsPigType();
+ BinSedesTuple t = (BinSedesTuple) val;
sampleMap = (Map<String, Object>) t.get(0);
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1568895&r1=1568894&r2=1568895&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Mon Feb 17 08:02:44 2014
@@ -1238,8 +1238,12 @@ public class TezCompiler extends PhyPlan
public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
//TODO: handle split and connect
try {
+
+ // The first vertex (prevOp) loads the left table and sends sample of join keys to
+ // vertex 2 (sampler vertex) and all data to vertex 3 (partition vertex) via 1-1 edge
+
// LR that transfers loaded input to partition vertex
- POLocalRearrangeTez lrTez = localRearrangeFactory.create(LocalRearrangeType.NULL);
+ POLocalRearrangeTez lrTez = new POLocalRearrangeTez(OperatorKey.genOpKey(scope));
// LR that broadcasts sampled input to sampling aggregation vertex
POLocalRearrangeTez lrTezSample = localRearrangeFactory.create(LocalRearrangeType.NULL);
@@ -1311,8 +1315,8 @@ public class TezCompiler extends PhyPlan
String mc = pigProperties.getProperty("pig.skewedjoin.reduce.maxtuple", "0");
int rp = Math.max(op.getRequestedParallelism(), 1);
- Pair<TezOperator, Integer> sampleJobPair = getSamplingAggregationJobs(sort, rp, null,
- PartitionSkewedKeys.class.getName(), new String[]{per, mc}, 2);
+ Pair<TezOperator, Integer> sampleJobPair = getSamplingAggregationJob(sort, rp, null,
+ PartitionSkewedKeys.class.getName(), new String[]{per, mc});
rp = sampleJobPair.second;
// Set parallelism of SkewedJoin as the value calculated by sampling
@@ -1329,17 +1333,8 @@ public class TezCompiler extends PhyPlan
blocking();
- // Then add a POPackage and a POForEach to the start of the new tezOp.
- POPackage pkg = getPackage(1, DataType.BYTEARRAY);
- curTezOp.plan.add(pkg);
-
- POProject project = new POProject(OperatorKey.genOpKey(scope));
- project.setResultType(DataType.BAG);
- project.setStar(false);
- project.setColumn(1);
- POForEach forEach =
- TezCompilerUtil.getForEach(project, sort.getRequestedParallelism(), scope, nig);
- curTezOp.plan.addAsLeaf(forEach);
+ // Add a POIdentityInOutTez to the joinJobs[0] which is a partition vertex.
+ // It just partitions the data from first vertex based on the quantiles from sample vertex.
joinJobs[0] = curTezOp;
// Run POLocalRearrange for first join table. Note we set the
@@ -1347,11 +1342,9 @@ public class TezCompiler extends PhyPlan
// its parallelism will be determined by the size of skewed table.
//TODO: Check if this really works as load vertex parallelism
// is determined during vertex construction.
- POLocalRearrangeTez lr =
- new POLocalRearrangeTez(new OperatorKey(scope,nig.getNextNodeId(scope)),
- Math.max(prevOp.getRequestedParallelism(), 1));
+ lrTez.setRequestedParallelism(prevOp.getRequestedParallelism());
try {
- lr.setIndex(0);
+ lrTez.setIndex(0);
} catch (ExecException e) {
int errCode = 2058;
String msg = "Unable to set index on newly created POLocalRearrange.";
@@ -1363,13 +1356,15 @@ public class TezCompiler extends PhyPlan
if (groups.size() == 1) {
type = groups.get(0).getLeaves().get(0).getResultType();
}
-
- // Run POLocalRearrange for first join table
- lr.setKeyType(type);
- lr.setPlans(groups);
- lr.setSkewedJoin(true);
- lr.setResultType(DataType.TUPLE);
- joinJobs[0].plan.addAsLeaf(lr);
+ lrTez.setKeyType(type);
+ lrTez.setPlans(groups);
+ lrTez.setSkewedJoin(true);
+ lrTez.setResultType(DataType.TUPLE);
+
+ POIdentityInOutTez identityInOutTez = new POIdentityInOutTez(
+ OperatorKey.genOpKey(scope), lrTez);
+ identityInOutTez.setInputKey(prevOp.getOperatorKey().toString());
+ joinJobs[0].plan.addAsLeaf(identityInOutTez);
joinJobs[0].setClosed(true);
rearrangeOutputs[0] = joinJobs[0];
@@ -1413,7 +1408,7 @@ public class TezCompiler extends PhyPlan
compiledInputs = new TezOperator[] {joinJobs[2]};
// Create POPakcage
- pkg = getPackage(2, type);
+ POPackage pkg = getPackage(2, type);
pkg.setResultType(DataType.TUPLE);
boolean [] inner = op.getInnerFlags();
pkg.getPkgr().setInner(inner);
@@ -1449,33 +1444,37 @@ public class TezCompiler extends PhyPlan
// Connect vertices
lrTez.setOutputKey(joinJobs[0].getOperatorKey().toString());
lrTezSample.setOutputKey(sampleJobPair.first.getOperatorKey().toString());
+ identityInOutTez.setOutputKey(joinJobs[2].getOperatorKey().toString());
+ pr.setOutputKey(joinJobs[2].getOperatorKey().toString());
TezEdgeDescriptor edge = joinJobs[0].inEdges.get(prevOp.getOperatorKey());
// TODO: Convert to unsorted shuffle after TEZ-661
- // edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
- // edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
- edge.partitionerClass = RoundRobinPartitioner.class;
+ // Use 1-1 edge
+ edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+ edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+ edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+ // If prevOp.requestedParallelism changes based on no. of input splits
+ // it will reflect for joinJobs[0] so that 1-1 edge will work.
+ joinJobs[0].setRequestedParallelismByReference(prevOp);
TezCompilerUtil.connect(tezPlan, prevOp, sampleJobPair.first);
- POSplit split = (POSplit) sampleJobPair.first.plan.getLeaves().get(0);
- List<PhysicalPlan> pp = split.getPlans();
- for (int i = 0; i < pp.size(); i++) {
- TezCompilerUtil.connect(tezPlan, sampleJobPair.first, joinJobs[i]);
+ POValueOutputTez sampleOut = (POValueOutputTez) sampleJobPair.first.plan.getLeaves().get(0);
+ for (int i = 0; i < 2; i++) {
+ joinJobs[i].sampleOperator = sampleJobPair.first;
// Configure broadcast edges for distribution map
- edge = joinJobs[i].inEdges.get(sampleJobPair.first.getOperatorKey());
+ edge = TezCompilerUtil.connect(tezPlan, sampleJobPair.first, joinJobs[i]);
edge.dataMovementType = DataMovementType.BROADCAST;
edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
- lrTez = (POLocalRearrangeTez) pp.get(i).getLeaves().get(0);
- lrTez.setOutputKey(joinJobs[i].getOperatorKey().toString());
- joinJobs[i].sampleOperator = sampleJobPair.first;
+ sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
// Configure skewed partitioner for join
edge = joinJobs[2].inEdges.get(joinJobs[i].getOperatorKey());
edge.partitionerClass = SkewedPartitionerTez.class;
}
+
joinJobs[2].setSkewedJoin(true);
phyToTezOpMap.put(op, curTezOp);
@@ -1609,7 +1608,7 @@ public class TezCompiler extends PhyPlan
return oper;
}
- private Pair<TezOperator,Integer> getQuantileJobs(
+ private Pair<TezOperator,Integer> getOrderbySamplingAggregationJob(
POSort inpSort,
int rp) throws PlanException, VisitorException, ExecException {
@@ -1636,7 +1635,7 @@ public class TezCompiler extends PhyPlan
}
}
- return getSamplingAggregationJobs(sort, rp, null, FindQuantiles.class.getName(), ctorArgs, 1);
+ return getSamplingAggregationJob(sort, rp, null, FindQuantiles.class.getName(), ctorArgs);
}
/**
@@ -1659,8 +1658,8 @@ public class TezCompiler extends PhyPlan
* @throws VisitorException
* @throws ExecException
*/
- private Pair<TezOperator,Integer> getSamplingAggregationJobs(POSort sort, int rp,
- List<PhysicalPlan> sortKeyPlans, String udfClassName, String[] udfArgs, int numLRs)
+ private Pair<TezOperator,Integer> getSamplingAggregationJob(POSort sort, int rp,
+ List<PhysicalPlan> sortKeyPlans, String udfClassName, String[] udfArgs)
throws PlanException, VisitorException, ExecException {
TezOperator oper = getTezOp();
@@ -1791,26 +1790,9 @@ public class TezCompiler extends PhyPlan
oper.plan.add(nfe3);
oper.plan.connect(nfe2, nfe3);
- if (numLRs > 1) {
- // Skewed join broadcast sample map to multiple vertices, so we need
- // to add POSplit to the plan and attach LRs to POSplit.
- POSplit split = new POSplit(new OperatorKey(scope,nig.getNextNodeId(scope)));
- oper.setSplitOperatorKey(split.getOperatorKey());
- oper.plan.add(split);
- oper.plan.connect(nfe3, split);
- splitsSeen.put(split.getOperatorKey(), oper);
-
- for (int i = 0; i < numLRs; i++) {
- POLocalRearrangeTez lr = localRearrangeFactory.create(LocalRearrangeType.NULL);
- PhysicalPlan pp = new PhysicalPlan();
- pp.add(lr);
- split.addPlan(pp);
- }
- } else {
- POLocalRearrangeTez lr = localRearrangeFactory.create(LocalRearrangeType.NULL);
- oper.plan.add(lr);
- oper.plan.connect(nfe3, lr);
- }
+ POValueOutputTez sampleOut = new POValueOutputTez(OperatorKey.genOpKey(scope));
+ oper.plan.add(sampleOut);
+ oper.plan.connect(nfe3, sampleOut);
oper.setClosed(true);
oper.setRequestedParallelism(1);
@@ -1998,7 +1980,7 @@ public class TezCompiler extends PhyPlan
// pigContext.defaultParallel to be taken into account
int rp = Math.max(op.getRequestedParallelism(), 1);
- Pair<TezOperator, Integer> quantJobParallelismPair = getQuantileJobs(op, rp);
+ Pair<TezOperator, Integer> quantJobParallelismPair = getOrderbySamplingAggregationJob(op, rp);
TezOperator[] sortOpers = getSortJobs(prevOper, lr, op, keyType, fields);
TezEdgeDescriptor edge = handleSplitAndConnect(tezPlan, prevOper, sortOpers[0]);
@@ -2029,8 +2011,8 @@ public class TezCompiler extends PhyPlan
edge.dataMovementType = DataMovementType.BROADCAST;
edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
- lr = (POLocalRearrangeTez)quantJobParallelismPair.first.plan.getLeaves().get(0);
- lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
+ POValueOutputTez sampleOut = (POValueOutputTez)quantJobParallelismPair.first.plan.getLeaves().get(0);
+ sampleOut.addOutputKey(sortOpers[0].getOperatorKey().toString());
sortOpers[0].sampleOperator = quantJobParallelismPair.first;
edge = TezCompilerUtil.connect(tezPlan, sortOpers[0], sortOpers[1]);
Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld?rev=1568895&r1=1568894&r2=1568895&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld Mon Feb 17 08:02:44 2014
@@ -2,12 +2,12 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-36
+# TEZ DAG plan: scope-35
#--------------------------------------------------
-Tez vertex scope-11 -> Tez vertex scope-29,Tez vertex scope-18,
-Tez vertex scope-18 -> Tez vertex scope-29,
-Tez vertex scope-29 -> Tez vertex scope-31,
-Tez vertex scope-31
+Tez vertex scope-11 -> Tez vertex scope-28,Tez vertex scope-18,
+Tez vertex scope-18 -> Tez vertex scope-28,
+Tez vertex scope-28 -> Tez vertex scope-30,
+Tez vertex scope-30
Tez vertex scope-11
# Plan on vertex
@@ -21,7 +21,7 @@ Local Rearrange[tuple]{tuple}(false) - s
| |
| Project[int][0] - scope-15
|
- |---b: Local Rearrange[tuple]{int}(false) - scope-12 -> scope-29
+ |---b: Local Rearrange[tuple]{int}(false) - scope-12 -> scope-28
| |
| Project[int][0] - scope-8
|
@@ -38,9 +38,7 @@ Local Rearrange[tuple]{tuple}(false) - s
|---a: Load(file:///tmp/input:PigStorage(',')) - scope-0
Tez vertex scope-18
# Plan on vertex
-Local Rearrange[tuple]{bytearray}(false) - scope-28 -> scope-29
-| |
-| Constant(DummyVal) - scope-27
+POValueOutputTez - scope-27 -> [scope-28]
|
|---New For Each(false)[tuple] - scope-26
| |
@@ -55,17 +53,17 @@ Local Rearrange[tuple]{bytearray}(false)
| Project[bag][1] - scope-20
|
|---Package(Packager)[tuple]{bytearray} - scope-19
-Tez vertex scope-29
+Tez vertex scope-28
# Plan on vertex
-POIdentityInOutTez - scope-30 -> scope-31
+POIdentityInOutTez - scope-29 -> scope-30
| |
| Project[int][0] - scope-8
-Tez vertex scope-31
+Tez vertex scope-30
# Plan on vertex
b: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-10
|
-|---New For Each(true)[tuple] - scope-34
+|---New For Each(true)[tuple] - scope-33
| |
- | Project[bag][1] - scope-33
+ | Project[bag][1] - scope-32
|
- |---Package(LitePackager)[tuple]{int} - scope-32
\ No newline at end of file
+ |---Package(LitePackager)[tuple]{int} - scope-31
\ No newline at end of file
Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld?rev=1568895&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld Mon Feb 17 08:02:44 2014
@@ -0,0 +1,101 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-56
+#--------------------------------------------------
+Tez vertex scope-27 -> Tez vertex scope-46,Tez vertex scope-36,
+Tez vertex scope-36 -> Tez vertex scope-46,Tez vertex scope-28,
+Tez vertex scope-46 -> Tez vertex scope-50,
+Tez vertex scope-28 -> Tez vertex scope-50,
+Tez vertex scope-50
+
+Tez vertex scope-27
+# Plan on vertex
+Local Rearrange[tuple]{tuple}(false) - scope-31 -> scope-36
+| |
+| Constant(DummyVal) - scope-30
+|
+|---New For Each(true,true)[tuple] - scope-35
+ | |
+ | Project[int][0] - scope-16
+ | |
+ | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-34
+ | |
+ | |---Project[tuple][*] - scope-33
+ |
+ |---PoissonSample - scope-32
+ |
+ |---Local Rearrange[tuple]{int}(false) - scope-29 -> scope-46
+ | |
+ | Project[int][0] - scope-16
+ |
+ |---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-36
+# Plan on vertex
+POValueOutputTez - scope-45 -> [scope-46, scope-28]
+|
+|---New For Each(false)[tuple] - scope-44
+ | |
+ | POUserFunc(org.apache.pig.impl.builtin.PartitionSkewedKeys)[tuple] - scope-43
+ | |
+ | |---Project[tuple][*] - scope-42
+ |
+ |---New For Each(false,false)[tuple] - scope-41
+ | |
+ | Constant(1) - scope-40
+ | |
+ | Project[bag][1] - scope-38
+ |
+ |---Package(Packager)[tuple]{bytearray} - scope-37
+Tez vertex scope-46
+# Plan on vertex
+POIdentityInOutTez - scope-47 -> scope-50
+| |
+| Project[int][0] - scope-16
+Tez vertex scope-28
+# Plan on vertex
+Partition Rearrange[tuple]{int}(false) - scope-48 -> scope-50
+| |
+| Project[int][0] - scope-17
+|
+|---b: New For Each(false,false)[bag] - scope-15
+ | |
+ | Cast[int] - scope-10
+ | |
+ | |---Project[bytearray][0] - scope-9
+ | |
+ | Cast[int] - scope-13
+ | |
+ | |---Project[bytearray][1] - scope-12
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-50
+# Plan on vertex
+d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-26
+|
+|---d: New For Each(false,false,false)[bag] - scope-25
+ | |
+ | Project[int][0] - scope-19
+ | |
+ | Project[int][1] - scope-21
+ | |
+ | Project[int][3] - scope-23
+ |
+ |---New For Each(true,true)[tuple] - scope-54
+ | |
+ | Project[bag][1] - scope-52
+ | |
+ | Project[bag][2] - scope-53
+ |
+ |---Package(Packager)[tuple]{int} - scope-51
\ No newline at end of file
Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld?rev=1568895&r1=1568894&r2=1568895&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld Mon Feb 17 08:02:44 2014
@@ -2,17 +2,17 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-210
+# TEZ DAG plan: scope-209
#--------------------------------------------------
-Tez vertex scope-106 -> Tez vertex scope-109,Tez vertex scope-122,Tez vertex scope-122,Tez vertex scope-126,Tez vertex scope-151,Tez vertex scope-140,Tez vertex scope-163,Tez vertex scope-188,
+Tez vertex scope-106 -> Tez vertex scope-109,Tez vertex scope-122,Tez vertex scope-122,Tez vertex scope-126,Tez vertex scope-150,Tez vertex scope-140,Tez vertex scope-162,Tez vertex scope-187,
Tez vertex scope-122
-Tez vertex scope-140 -> Tez vertex scope-151,
-Tez vertex scope-151 -> Tez vertex scope-153,
-Tez vertex scope-153
+Tez vertex scope-162 -> Tez vertex scope-187,
+Tez vertex scope-187
Tez vertex scope-126
+Tez vertex scope-140 -> Tez vertex scope-150,
+Tez vertex scope-150 -> Tez vertex scope-152,
+Tez vertex scope-152
Tez vertex scope-109
-Tez vertex scope-163 -> Tez vertex scope-188,
-Tez vertex scope-188
Tez vertex scope-106
# Plan on vertex
@@ -40,7 +40,7 @@ Tez vertex scope-106
| | | | |
| | | | Project[int][0] - scope-137
| | | |
-| | | |---e1: Local Rearrange[tuple]{int}(false) - scope-134 -> scope-151
+| | | |---e1: Local Rearrange[tuple]{int}(false) - scope-134 -> scope-150
| | | | |
| | | | Project[int][0] - scope-88
| | | |
@@ -52,9 +52,9 @@ Tez vertex scope-106
| | | | |
| | | | |---Constant(3) - scope-86
| | | |
-| | | f1: Local Rearrange[tuple]{tuple}(false) - scope-162 -> scope-163
+| | | f1: Local Rearrange[tuple]{tuple}(false) - scope-161 -> scope-162
| | | | |
-| | | | Project[tuple][*] - scope-161
+| | | | Project[tuple][*] - scope-160
| | | |
| | | |---f1: Limit - scope-95
| | | |
@@ -80,21 +80,21 @@ Tez vertex scope-106
| | | |
| | | Project[int][0] - scope-48
| | |
-| | c2: Local Rearrange[tuple]{int}(false) - scope-202 -> scope-126
+| | c2: Local Rearrange[tuple]{int}(false) - scope-201 -> scope-126
| | | |
-| | | Project[int][0] - scope-204
+| | | Project[int][0] - scope-203
| | |
-| | |---c3: New For Each(false,false)[bag] - scope-190
+| | |---c3: New For Each(false,false)[bag] - scope-189
| | | |
-| | | Project[int][0] - scope-191
+| | | Project[int][0] - scope-190
| | | |
-| | | POUserFunc(org.apache.pig.builtin.AlgebraicMathBase$Initial)[tuple] - scope-192
+| | | POUserFunc(org.apache.pig.builtin.AlgebraicMathBase$Initial)[tuple] - scope-191
| | | |
-| | | |---Project[bag][0] - scope-193
+| | | |---Project[bag][0] - scope-192
| | | |
-| | | |---Project[bag][1] - scope-194
+| | | |---Project[bag][1] - scope-193
| | |
-| | |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-205
+| | |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-204
| |
| |---c: Filter[bag] - scope-34
| | |
@@ -108,9 +108,9 @@ Tez vertex scope-106
| | |
| | d1: Store(file:///tmp/output/d1:org.apache.pig.builtin.PigStorage) - scope-80
| | |
-| | f2: Local Rearrange[tuple]{tuple}(false) - scope-185 -> scope-188
+| | f2: Local Rearrange[tuple]{tuple}(false) - scope-184 -> scope-187
| | | |
-| | | Project[tuple][*] - scope-184
+| | | Project[tuple][*] - scope-183
| |
| |---d1: Filter[bag] - scope-73
| | |
@@ -150,54 +150,43 @@ c1: Store(file:///tmp/output/c1:org.apac
| Project[bag][2] - scope-52
|
|---c1: Package(Packager)[tuple]{int} - scope-46
-Tez vertex scope-140
+Tez vertex scope-162
# Plan on vertex
-Local Rearrange[tuple]{bytearray}(false) - scope-150 -> scope-151
+f1: Split - scope-96
+| |
+| f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-99
| |
-| Constant(DummyVal) - scope-149
+| f2: Local Rearrange[tuple]{tuple}(false) - scope-186 -> scope-187
+| | |
+| | Project[tuple][*] - scope-185
|
-|---New For Each(false)[tuple] - scope-148
- | |
- | POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-147
- | |
- | |---Project[tuple][*] - scope-146
+|---f1: Limit - scope-166
|
- |---New For Each(false,false)[tuple] - scope-145
- | |
- | Constant(1) - scope-144
+ |---f1: New For Each(true)[bag] - scope-165
| |
- | Project[bag][1] - scope-142
+ | Project[tuple][1] - scope-164
|
- |---Package(Packager)[tuple]{bytearray} - scope-141
-Tez vertex scope-151
+ |---f1: Package(Packager)[tuple]{tuple} - scope-163
+Tez vertex scope-187
# Plan on vertex
-POIdentityInOutTez - scope-152 -> scope-153
-| |
-| Project[int][0] - scope-88
-Tez vertex scope-153
-# Plan on vertex
-e1: Store(file:///tmp/output/e1:org.apache.pig.builtin.PigStorage) - scope-90
+f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-105
|
-|---New For Each(true)[tuple] - scope-156
- | |
- | Project[bag][1] - scope-155
- |
- |---Package(LitePackager)[tuple]{int} - scope-154
+|---f2: Package(Packager)[tuple]{tuple} - scope-188
Tez vertex scope-126
# Combine plan on edge <scope-106>
-c2: Local Rearrange[tuple]{int}(false) - scope-206 -> scope-126
+c2: Local Rearrange[tuple]{int}(false) - scope-205 -> scope-126
| |
-| Project[int][0] - scope-208
+| Project[int][0] - scope-207
|
-|---c3: New For Each(false,false)[bag] - scope-195
+|---c3: New For Each(false,false)[bag] - scope-194
| |
- | Project[int][0] - scope-196
+ | Project[int][0] - scope-195
| |
- | POUserFunc(org.apache.pig.builtin.LongSum$Intermediate)[tuple] - scope-197
+ | POUserFunc(org.apache.pig.builtin.LongSum$Intermediate)[tuple] - scope-196
| |
- | |---Project[bag][1] - scope-198
+ | |---Project[bag][1] - scope-197
|
- |---c2: Package(CombinerPackager)[tuple]{int} - scope-201
+ |---c2: Package(CombinerPackager)[tuple]{int} - scope-200
# Plan on vertex
c3: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-68
|
@@ -207,9 +196,40 @@ c3: Store(file:///tmp/output/c1:org.apac
| |
| POUserFunc(org.apache.pig.builtin.LongSum$Final)[long] - scope-65
| |
- | |---Project[bag][1] - scope-199
+ | |---Project[bag][1] - scope-198
|
|---c2: Package(CombinerPackager)[tuple]{int} - scope-58
+Tez vertex scope-140
+# Plan on vertex
+POValueOutputTez - scope-149 -> [scope-150]
+|
+|---New For Each(false)[tuple] - scope-148
+ | |
+ | POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-147
+ | |
+ | |---Project[tuple][*] - scope-146
+ |
+ |---New For Each(false,false)[tuple] - scope-145
+ | |
+ | Constant(1) - scope-144
+ | |
+ | Project[bag][1] - scope-142
+ |
+ |---Package(Packager)[tuple]{bytearray} - scope-141
+Tez vertex scope-150
+# Plan on vertex
+POIdentityInOutTez - scope-151 -> scope-152
+| |
+| Project[int][0] - scope-88
+Tez vertex scope-152
+# Plan on vertex
+e1: Store(file:///tmp/output/e1:org.apache.pig.builtin.PigStorage) - scope-90
+|
+|---New For Each(true)[tuple] - scope-155
+ | |
+ | Project[bag][1] - scope-154
+ |
+ |---Package(LitePackager)[tuple]{int} - scope-153
Tez vertex scope-109
# Plan on vertex
b1: Split - scope-20
@@ -228,26 +248,4 @@ b1: Split - scope-20
| | |
| | |---Project[bag][1] - scope-28
|
-|---b1: Package(Packager)[tuple]{int} - scope-17
-Tez vertex scope-163
-# Plan on vertex
-f1: Split - scope-96
-| |
-| f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-99
-| |
-| f2: Local Rearrange[tuple]{tuple}(false) - scope-187 -> scope-188
-| | |
-| | Project[tuple][*] - scope-186
-|
-|---f1: Limit - scope-167
- |
- |---f1: New For Each(true)[bag] - scope-166
- | |
- | Project[tuple][1] - scope-165
- |
- |---f1: Package(Packager)[tuple]{tuple} - scope-164
-Tez vertex scope-188
-# Plan on vertex
-f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-105
-|
-|---f2: Package(Packager)[tuple]{tuple} - scope-189
\ No newline at end of file
+|---b1: Package(Packager)[tuple]{int} - scope-17
\ No newline at end of file
Modified: pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java?rev=1568895&r1=1568894&r2=1568895&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java Mon Feb 17 08:02:44 2014
@@ -106,6 +106,18 @@ public class TestTezCompiler {
}
@Test
+ public void testSkewedJoin() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+ "c = join a by x, b by x using 'skewed';" +
+ "d = foreach c generate a::x as x, y, z;" +
+ "store d into 'file:///tmp/output';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld");
+ }
+
+ @Test
public void testLimit() throws Exception {
String query =
"a = load 'file:///tmp/input' as (x:int, y:int);" +