You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/01/31 01:20:34 UTC
svn commit: r1563022 - in /pig/branches/tez:
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/...
Author: rohini
Date: Fri Jan 31 00:20:34 2014
New Revision: 1563022
URL: http://svn.apache.org/r1563022
Log:
PIG-3732: Use ONE_TO_ONE edge and IdentityInOut in orderby intermediate vertex
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDAG.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java
pig/branches/tez/src/org/apache/pig/impl/io/NullablePartitionWritable.java
pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Fri Jan 31 00:20:34 2014
@@ -57,7 +57,8 @@ import org.apache.pig.impl.util.Utils;
public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>
implements Configurable {
- protected Map<PigNullableWritable, DiscreteProbabilitySampleGenerator> weightedParts;
+ protected Map<PigNullableWritable, DiscreteProbabilitySampleGenerator> weightedParts =
+ new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
protected PigNullableWritable[] quantiles;
private RawComparator<PigNullableWritable> comparator;
private PigContext pigContext;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Fri Jan 31 00:20:34 2014
@@ -65,6 +65,8 @@ public abstract class PhysicalOperator e
private static final Log log = LogFactory.getLog(PhysicalOperator.class);
protected static final long serialVersionUID = 1L;
+ protected static final Result RESULT_EMPTY = new Result(POStatus.STATUS_NULL, null);
+ protected static final Result RESULT_EOP = new Result(POStatus.STATUS_EOP, null);
// The degree of parallelism requested
protected int requestedParallelism;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java Fri Jan 31 00:20:34 2014
@@ -45,7 +45,6 @@ import org.apache.tez.runtime.library.ou
public class POLocalRearrangeTez extends POLocalRearrange implements TezOutput {
private static final long serialVersionUID = 1L;
- protected static Result empty = new Result(POStatus.STATUS_NULL, null);
protected String outputKey;
protected transient KeyValueWriter writer;
@@ -160,7 +159,7 @@ public class POLocalRearrangeTez extends
} else {
illustratorMarkup(res.result, res.result, 0);
}
- res = empty;
+ res = RESULT_EMPTY;
break;
case POStatus.STATUS_EOP:
case POStatus.STATUS_ERR:
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java Fri Jan 31 00:20:34 2014
@@ -141,7 +141,7 @@ public class POPartitionRearrangeTez ext
}
}
- res = empty;
+ res = RESULT_EMPTY;
}
return inp;
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Fri Jan 31 00:20:34 2014
@@ -33,7 +33,6 @@ import org.apache.pig.data.AccumulativeB
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.tez.runtime.api.LogicalInput;
@@ -109,11 +108,7 @@ public class POShuffleTezLoad extends PO
hasData = true;
cur = readers.get(i).getCurrentKey();
if (min == null || comparator.compare(min, cur) > 0) {
- min = PigNullableWritable.newInstance((PigNullableWritable)cur);
- if (isSkewedJoin) {
- ((NullablePartitionWritable)min).setKey(
- ((NullablePartitionWritable)cur).getKey());
- }
+ min = ((PigNullableWritable)cur).clone();
}
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Fri Jan 31 00:20:34 2014
@@ -151,6 +151,10 @@ public class PigProcessor implements Log
for (POShuffleTezLoad shuffle : shuffles){
shuffle.attachInputs(inputs, conf);
}
+ LinkedList<POIdentityInOutTez> identityInOuts = PlanHelper.getPhysicalOperators(execPlan, POIdentityInOutTez.class);
+ for (POIdentityInOutTez identityInOut : identityInOuts){
+ identityInOut.attachInputs(inputs, conf);
+ }
LinkedList<POFRJoinTez> broadcasts = PlanHelper.getPhysicalOperators(execPlan, POFRJoinTez.class);
for (POFRJoinTez broadcast : broadcasts){
broadcast.attachInputs(inputs, conf);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Fri Jan 31 00:20:34 2014
@@ -1445,8 +1445,12 @@ public class TezCompiler extends PhyPlan
* @return Tez operator that now is finished with a store.
* @throws PlanException
*/
- private TezOperator endSingleInputWithStoreAndSample(POSort sort, POLocalRearrangeTez lr,
- POLocalRearrangeTez lrSample) throws PlanException {
+ private TezOperator endSingleInputWithStoreAndSample(
+ POSort sort,
+ POLocalRearrangeTez lr,
+ POLocalRearrangeTez lrSample,
+ byte keyType,
+ Pair<POProject, Byte>[] fields) throws PlanException {
if(compiledInputs.length>1) {
int errCode = 2023;
String msg = "Received a multi input plan when expecting only a single input one.";
@@ -1454,6 +1458,35 @@ public class TezCompiler extends PhyPlan
}
TezOperator oper = compiledInputs[0];
if (!oper.isClosed()) {
+
+ List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+ if (fields == null) {
+ // This is project *
+ PhysicalPlan ep = new PhysicalPlan();
+ POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ prj.setStar(true);
+ prj.setOverloaded(false);
+ prj.setResultType(DataType.TUPLE);
+ ep.add(prj);
+ eps.add(ep);
+ } else {
+ // Attach the sort plans to the local rearrange to get the
+ // projection.
+ eps.addAll(sort.getSortPlans());
+ }
+
+ try {
+ lr.setIndex(0);
+ } catch (ExecException e) {
+ int errCode = 2058;
+ String msg = "Unable to set index on newly created POLocalRearrange.";
+ throw new PlanException(msg, errCode, PigException.BUG, e);
+ }
+ lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : keyType);
+ lr.setPlans(eps);
+ lr.setResultType(DataType.TUPLE);
+ lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
+
lr.setOutputKey(curTezOp.getOperatorKey().toString());
oper.plan.addAsLeaf(lr);
@@ -1769,31 +1802,20 @@ public class TezCompiler extends PhyPlan
}
private TezOperator[] getSortJobs(
+ TezOperator inputOper,
+ POLocalRearrangeTez inputOperRearrange,
POSort sort,
- int rp,
byte keyType,
Pair<POProject, Byte>[] fields) throws PlanException{
TezOperator[] opers = new TezOperator[2];
TezOperator oper1 = getTezOp();
tezPlan.add(oper1);
opers[0] = oper1;
- oper1.requestedParallelism = rp;
long limit = sort.getLimit();
+ //TODO: TezOperator limit not used at all
oper1.limit = limit;
- List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
-
- POPackage pkg = getPackage(1, DataType.BYTEARRAY);
- oper1.plan.add(pkg);
-
- POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
- project.setResultType(DataType.BAG);
- project.setStar(false);
- project.setColumn(1);
- POForEach forEach = TezCompilerUtil.getForEach(project, sort.getRequestedParallelism(), scope, nig);
- oper1.plan.addAsLeaf(forEach);
-
boolean[] sortOrder;
List<Boolean> sortOrderList = sort.getMAscCols();
@@ -1805,42 +1827,18 @@ public class TezCompiler extends PhyPlan
oper1.setSortOrder(sortOrder);
}
- if (fields == null) {
- // This is project *
- PhysicalPlan ep = new PhysicalPlan();
- POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- prj.setStar(true);
- prj.setOverloaded(false);
- prj.setResultType(DataType.TUPLE);
- ep.add(prj);
- eps1.add(ep);
- } else {
- // Attach the sort plans to the local rearrange to get the
- // projection.
- eps1.addAll(sort.getSortPlans());
- }
-
- POLocalRearrangeTez lr = new POLocalRearrangeTez(new OperatorKey(scope,nig.getNextNodeId(scope)));
- try {
- lr.setIndex(0);
- } catch (ExecException e) {
- int errCode = 2058;
- String msg = "Unable to set index on newly created POLocalRearrange.";
- throw new PlanException(msg, errCode, PigException.BUG, e);
- }
- lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : keyType);
- lr.setPlans(eps1);
- lr.setResultType(DataType.TUPLE);
- lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
- oper1.plan.addAsLeaf(lr);
-
+ POIdentityInOutTez identityInOutTez = new POIdentityInOutTez(
+ new OperatorKey(scope, nig.getNextNodeId(scope)),
+ inputOperRearrange);
+ identityInOutTez.setInputKey(inputOper.getOperatorKey().toString());
+ oper1.plan.addAsLeaf(identityInOutTez);
oper1.setClosed(true);
TezOperator oper2 = getTezOp();
oper2.setGlobalSort(true);
opers[1] = oper2;
tezPlan.add(oper2);
- lr.setOutputKey(oper2.getOperatorKey().toString());
+ identityInOutTez.setOutputKey(oper2.getOperatorKey().toString());
if (limit!=-1) {
POPackage pkg_c = new POPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
@@ -1890,7 +1888,7 @@ public class TezCompiler extends PhyPlan
combinePlan.addAsLeaf(lr_c2);
}
- pkg = new POPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ POPackage pkg = new POPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
pkg.setPkgr(new LitePackager());
pkg.getPkgr().setKeyType((fields == null || fields.length > 1) ? DataType.TUPLE : keyType);
pkg.setNumInps(1);
@@ -1935,22 +1933,33 @@ public class TezCompiler extends PhyPlan
throw new PlanException(msg, errCode, PigException.BUG, ve);
}
- POLocalRearrangeTez lr = localRearrangeFactory.create(LocalRearrangeType.NULL);
+ POLocalRearrangeTez lr = new POLocalRearrangeTez(new OperatorKey(scope, nig.getNextNodeId(scope)));
POLocalRearrangeTez lrSample = localRearrangeFactory.create(LocalRearrangeType.NULL);
- TezOperator prevOper = endSingleInputWithStoreAndSample(op, lr, lrSample);
+ TezOperator prevOper = endSingleInputWithStoreAndSample(op, lr, lrSample, keyType, fields);
int rp = Math.max(op.getRequestedParallelism(), 1);
Pair<TezOperator, Integer> quantJobParallelismPair = getQuantileJobs(op, rp);
- TezOperator[] sortOpers = getSortJobs(op, quantJobParallelismPair.second, keyType, fields);
+ TezOperator[] sortOpers = getSortJobs(prevOper, lr, op, keyType, fields);
TezEdgeDescriptor edge = handleSplitAndConnect(tezPlan, prevOper, sortOpers[0]);
// TODO: Convert to unsorted shuffle after TEZ-661
// edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
// edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
- edge.partitionerClass = RoundRobinPartitioner.class;
+ // edge.partitionerClass = RoundRobinPartitioner.class;
+
+ // TODO: Test which is better - ONE_TO_ONE from prevOper to same number of
+ // tasks in sortOpers[0] and then sortOpers[1] with requestedParallelism
+ // or unsorted shuffled output (TEZ-661) from prevOper to
+ // sortOpers[0] with requestedParallelism and then sortOpers[1] with
+ // requestedParallelism
+ sortOpers[0].requestedParallelism = prevOper.requestedParallelism;
+ sortOpers[1].requestedParallelism = quantJobParallelismPair.second;
+ edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+ edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+ edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
handleSplitAndConnect(tezPlan, prevOper, quantJobParallelismPair.first, false);
lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDAG.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDAG.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDAG.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDAG.java Fri Jan 31 00:20:34 2014
@@ -27,6 +27,7 @@ public class TezDAG extends DAG {
public TezDAG(String name) {
super(name);
this.credentials = new Credentials();
+ super.setCredentials(credentials);
}
public Credentials getCredentials() {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java Fri Jan 31 00:20:34 2014
@@ -17,7 +17,6 @@
*/
package org.apache.pig.backend.hadoop.executionengine.tez;
-import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -64,7 +63,6 @@ public class WeightedRangePartitionerTez
long start = System.currentTimeMillis();
try {
- weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
convertToArray(quantilesList);
Modified: pig/branches/tez/src/org/apache/pig/impl/io/NullablePartitionWritable.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/io/NullablePartitionWritable.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/io/NullablePartitionWritable.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/io/NullablePartitionWritable.java Fri Jan 31 00:20:34 2014
@@ -55,7 +55,15 @@ public class NullablePartitionWritable e
return partitionIndex;
}
- @Override
+ @Override
+ public NullablePartitionWritable clone() throws CloneNotSupportedException {
+ NullablePartitionWritable clone = new NullablePartitionWritable();
+ clone.setKey(this.getKey());
+ clone.partitionIndex = this.partitionIndex;
+ return clone;
+ }
+
+ @Override
public int compareTo(Object o) {
return key.compareTo(((NullablePartitionWritable)o).getKey());
}
Modified: pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java Fri Jan 31 00:20:34 2014
@@ -40,7 +40,7 @@ import org.apache.pig.data.Tuple;
//Put in to make the compiler not complain about WritableComparable
//being a generic type.
@SuppressWarnings("unchecked")
-public abstract class PigNullableWritable implements WritableComparable {
+public abstract class PigNullableWritable implements WritableComparable, Cloneable {
/**
* indices in multiquery optimized maps
@@ -61,12 +61,17 @@ public abstract class PigNullableWritabl
private byte mIndex;
- public static PigNullableWritable newInstance(PigNullableWritable copy) throws Exception {
- PigNullableWritable instance = copy.getClass().newInstance();
- instance.mNull = copy.mNull;
- instance.mValue = copy.mValue;
- instance.mIndex = copy.mIndex;
- return instance;
+ @Override
+ public PigNullableWritable clone() throws CloneNotSupportedException {
+ try {
+ PigNullableWritable clone = this.getClass().newInstance();
+ clone.mNull = this.mNull;
+ clone.mValue = this.mValue;
+ clone.mIndex = this.mIndex;
+ return clone;
+ } catch (Exception e) {
+ throw new RuntimeException("Exception while cloning " + this, e);
+ }
}
/**
Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld Fri Jan 31 00:20:34 2014
@@ -2,28 +2,28 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-40
+# TEZ DAG plan: scope-36
#--------------------------------------------------
-Tez vertex scope-11 -> Tez vertex scope-30,Tez vertex scope-19,
-Tez vertex scope-19 -> Tez vertex scope-30,
-Tez vertex scope-30 -> Tez vertex scope-35,
-Tez vertex scope-35
+Tez vertex scope-11 -> Tez vertex scope-29,Tez vertex scope-18,
+Tez vertex scope-18 -> Tez vertex scope-29,
+Tez vertex scope-29 -> Tez vertex scope-31,
+Tez vertex scope-31
Tez vertex scope-11
# Plan on vertex
-Local Rearrange[tuple]{tuple}(false) - scope-15 -> scope-19
+Local Rearrange[tuple]{tuple}(false) - scope-14 -> scope-18
| |
-| Constant(DummyVal) - scope-14
+| Constant(DummyVal) - scope-13
|
-|---ReservoirSample - scope-18
+|---ReservoirSample - scope-17
|
- |---New For Each(false)[tuple] - scope-17
+ |---New For Each(false)[tuple] - scope-16
| |
- | Project[int][0] - scope-16
+ | Project[int][0] - scope-15
|
- |---Local Rearrange[tuple]{bytearray}(false) - scope-13 -> scope-30
+ |---b: Local Rearrange[tuple]{int}(false) - scope-12 -> scope-29
| |
- | Constant(DummyVal) - scope-12
+ | Project[int][0] - scope-8
|
|---a: New For Each(false,false)[bag] - scope-7
| |
@@ -36,42 +36,36 @@ Local Rearrange[tuple]{tuple}(false) - s
| |---Project[bytearray][1] - scope-4
|
|---a: Load(file:///tmp/input:PigStorage(',')) - scope-0
-Tez vertex scope-19
+Tez vertex scope-18
# Plan on vertex
-Local Rearrange[tuple]{bytearray}(false) - scope-29 -> scope-30
+Local Rearrange[tuple]{bytearray}(false) - scope-28 -> scope-29
| |
-| Constant(DummyVal) - scope-28
+| Constant(DummyVal) - scope-27
|
-|---New For Each(false)[tuple] - scope-27
+|---New For Each(false)[tuple] - scope-26
| |
- | POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-26
+ | POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-25
| |
- | |---Project[tuple][*] - scope-25
+ | |---Project[tuple][*] - scope-24
|
- |---New For Each(false,false)[tuple] - scope-24
+ |---New For Each(false,false)[tuple] - scope-23
| |
- | Constant(1) - scope-23
+ | Constant(1) - scope-22
| |
- | Project[bag][1] - scope-21
+ | Project[bag][1] - scope-20
|
- |---Package(Packager)[tuple]{bytearray} - scope-20
-Tez vertex scope-30
+ |---Package(Packager)[tuple]{bytearray} - scope-19
+Tez vertex scope-29
# Plan on vertex
-b: Local Rearrange[tuple]{int}(false) - scope-34 -> scope-35
+POIdentityInOutTez - scope-30 -> scope-31
| |
| Project[int][0] - scope-8
-|
-|---New For Each(true)[bag] - scope-33
- | |
- | Project[bag][1] - scope-32
- |
- |---Package(Packager)[tuple]{bytearray} - scope-31
-Tez vertex scope-35
+Tez vertex scope-31
# Plan on vertex
b: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-10
|
-|---New For Each(true)[tuple] - scope-38
+|---New For Each(true)[tuple] - scope-34
| |
- | Project[bag][1] - scope-37
+ | Project[bag][1] - scope-33
|
- |---Package(LitePackager)[tuple]{int} - scope-36
\ No newline at end of file
+ |---Package(LitePackager)[tuple]{int} - scope-32
\ No newline at end of file
Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld Fri Jan 31 00:20:34 2014
@@ -2,16 +2,16 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-200
+# TEZ DAG plan: scope-196
#--------------------------------------------------
-Tez vertex scope-106 -> Tez vertex scope-109,Tez vertex scope-122,Tez vertex scope-122,Tez vertex scope-126,Tez vertex scope-152,Tez vertex scope-141,Tez vertex scope-178,Tez vertex scope-178,
+Tez vertex scope-106 -> Tez vertex scope-109,Tez vertex scope-122,Tez vertex scope-122,Tez vertex scope-126,Tez vertex scope-151,Tez vertex scope-140,Tez vertex scope-174,Tez vertex scope-174,
Tez vertex scope-122
+Tez vertex scope-174
+Tez vertex scope-140 -> Tez vertex scope-151,
+Tez vertex scope-151 -> Tez vertex scope-153,
+Tez vertex scope-153
Tez vertex scope-126
-Tez vertex scope-141 -> Tez vertex scope-152,
-Tez vertex scope-152 -> Tez vertex scope-157,
-Tez vertex scope-157
Tez vertex scope-109
-Tez vertex scope-178
Tez vertex scope-106
# Plan on vertex
@@ -29,19 +29,19 @@ Tez vertex scope-106
| | |
| | 1-2: Split - scope-83
| | | |
-| | | Local Rearrange[tuple]{tuple}(false) - scope-137 -> scope-141
+| | | Local Rearrange[tuple]{tuple}(false) - scope-136 -> scope-140
| | | | |
-| | | | Constant(DummyVal) - scope-136
+| | | | Constant(DummyVal) - scope-135
| | | |
-| | | |---ReservoirSample - scope-140
+| | | |---ReservoirSample - scope-139
| | | |
-| | | |---New For Each(false)[tuple] - scope-139
+| | | |---New For Each(false)[tuple] - scope-138
| | | | |
-| | | | Project[int][0] - scope-138
+| | | | Project[int][0] - scope-137
| | | |
-| | | |---Local Rearrange[tuple]{bytearray}(false) - scope-135 -> scope-152
+| | | |---e1: Local Rearrange[tuple]{int}(false) - scope-134 -> scope-151
| | | | |
-| | | | Constant(DummyVal) - scope-134
+| | | | Project[int][0] - scope-88
| | | |
| | | |---e: Filter[bag] - scope-84
| | | | |
@@ -55,9 +55,9 @@ Tez vertex scope-106
| | | | |
| | | | f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-99
| | | | |
-| | | | f2: Local Rearrange[tuple]{tuple}(false) - scope-177 -> scope-178
+| | | | f2: Local Rearrange[tuple]{tuple}(false) - scope-173 -> scope-174
| | | | | |
-| | | | | Project[tuple][*] - scope-176
+| | | | | Project[tuple][*] - scope-172
| | | |
| | | |---f1: Limit - scope-95
| | | |
@@ -83,21 +83,21 @@ Tez vertex scope-106
| | | |
| | | Project[int][0] - scope-48
| | |
-| | c2: Local Rearrange[tuple]{int}(false) - scope-192 -> scope-126
+| | c2: Local Rearrange[tuple]{int}(false) - scope-188 -> scope-126
| | | |
-| | | Project[int][0] - scope-194
+| | | Project[int][0] - scope-190
| | |
-| | |---c3: New For Each(false,false)[bag] - scope-180
+| | |---c3: New For Each(false,false)[bag] - scope-176
| | | |
-| | | Project[int][0] - scope-181
+| | | Project[int][0] - scope-177
| | | |
-| | | POUserFunc(org.apache.pig.builtin.AlgebraicMathBase$Initial)[tuple] - scope-182
+| | | POUserFunc(org.apache.pig.builtin.AlgebraicMathBase$Initial)[tuple] - scope-178
| | | |
-| | | |---Project[bag][0] - scope-183
+| | | |---Project[bag][0] - scope-179
| | | |
-| | | |---Project[bag][1] - scope-184
+| | | |---Project[bag][1] - scope-180
| | |
-| | |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-195
+| | |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-191
| |
| |---c: Filter[bag] - scope-34
| | |
@@ -111,9 +111,9 @@ Tez vertex scope-106
| | |
| | d1: Store(file:///tmp/output/d1:org.apache.pig.builtin.PigStorage) - scope-80
| | |
-| | f2: Local Rearrange[tuple]{tuple}(false) - scope-175 -> scope-178
+| | f2: Local Rearrange[tuple]{tuple}(false) - scope-171 -> scope-174
| | | |
-| | | Project[tuple][*] - scope-174
+| | | Project[tuple][*] - scope-170
| |
| |---d1: Filter[bag] - scope-73
| | |
@@ -153,72 +153,71 @@ c1: Store(file:///tmp/output/c1:org.apac
| Project[bag][2] - scope-52
|
|---c1: Package(Packager)[tuple]{int} - scope-46
-Tez vertex scope-126
-# Combine plan on edge <scope-106>
-c2: Local Rearrange[tuple]{int}(false) - scope-196 -> scope-126
-| |
-| Project[int][0] - scope-198
-|
-|---c3: New For Each(false,false)[bag] - scope-185
- | |
- | Project[int][0] - scope-186
- | |
- | POUserFunc(org.apache.pig.builtin.LongSum$Intermediate)[tuple] - scope-187
- | |
- | |---Project[bag][1] - scope-188
- |
- |---c2: Package(CombinerPackager)[tuple]{int} - scope-191
+Tez vertex scope-174
# Plan on vertex
-c3: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-68
+f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-105
|
-|---c3: New For Each(false,false)[bag] - scope-67
- | |
- | Project[int][0] - scope-61
- | |
- | POUserFunc(org.apache.pig.builtin.LongSum$Final)[long] - scope-65
- | |
- | |---Project[bag][1] - scope-189
- |
- |---c2: Package(CombinerPackager)[tuple]{int} - scope-58
-Tez vertex scope-141
+|---f2: Package(Packager)[tuple]{tuple} - scope-175
+Tez vertex scope-140
# Plan on vertex
-Local Rearrange[tuple]{bytearray}(false) - scope-151 -> scope-152
+Local Rearrange[tuple]{bytearray}(false) - scope-150 -> scope-151
| |
-| Constant(DummyVal) - scope-150
+| Constant(DummyVal) - scope-149
|
-|---New For Each(false)[tuple] - scope-149
+|---New For Each(false)[tuple] - scope-148
| |
- | POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-148
+ | POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-147
| |
- | |---Project[tuple][*] - scope-147
+ | |---Project[tuple][*] - scope-146
|
- |---New For Each(false,false)[tuple] - scope-146
+ |---New For Each(false,false)[tuple] - scope-145
| |
- | Constant(1) - scope-145
+ | Constant(1) - scope-144
| |
- | Project[bag][1] - scope-143
+ | Project[bag][1] - scope-142
|
- |---Package(Packager)[tuple]{bytearray} - scope-142
-Tez vertex scope-152
+ |---Package(Packager)[tuple]{bytearray} - scope-141
+Tez vertex scope-151
# Plan on vertex
-e1: Local Rearrange[tuple]{int}(false) - scope-156 -> scope-157
+POIdentityInOutTez - scope-152 -> scope-153
| |
| Project[int][0] - scope-88
+Tez vertex scope-153
+# Plan on vertex
+e1: Store(file:///tmp/output/e1:org.apache.pig.builtin.PigStorage) - scope-90
|
-|---New For Each(true)[bag] - scope-155
+|---New For Each(true)[tuple] - scope-156
| |
- | Project[bag][1] - scope-154
+ | Project[bag][1] - scope-155
|
- |---Package(Packager)[tuple]{bytearray} - scope-153
-Tez vertex scope-157
+ |---Package(LitePackager)[tuple]{int} - scope-154
+Tez vertex scope-126
+# Combine plan on edge <scope-106>
+c2: Local Rearrange[tuple]{int}(false) - scope-192 -> scope-126
+| |
+| Project[int][0] - scope-194
+|
+|---c3: New For Each(false,false)[bag] - scope-181
+ | |
+ | Project[int][0] - scope-182
+ | |
+ | POUserFunc(org.apache.pig.builtin.LongSum$Intermediate)[tuple] - scope-183
+ | |
+ | |---Project[bag][1] - scope-184
+ |
+ |---c2: Package(CombinerPackager)[tuple]{int} - scope-187
# Plan on vertex
-e1: Store(file:///tmp/output/e1:org.apache.pig.builtin.PigStorage) - scope-90
+c3: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-68
|
-|---New For Each(true)[tuple] - scope-160
+|---c3: New For Each(false,false)[bag] - scope-67
+ | |
+ | Project[int][0] - scope-61
+ | |
+ | POUserFunc(org.apache.pig.builtin.LongSum$Final)[long] - scope-65
| |
- | Project[bag][1] - scope-159
+ | |---Project[bag][1] - scope-185
|
- |---Package(LitePackager)[tuple]{int} - scope-158
+ |---c2: Package(CombinerPackager)[tuple]{int} - scope-58
Tez vertex scope-109
# Plan on vertex
b1: Split - scope-20
@@ -237,9 +236,4 @@ b1: Split - scope-20
| | |
| | |---Project[bag][1] - scope-28
|
-|---b1: Package(Packager)[tuple]{int} - scope-17
-Tez vertex scope-178
-# Plan on vertex
-f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-105
-|
-|---f2: Package(Packager)[tuple]{tuple} - scope-179
\ No newline at end of file
+|---b1: Package(Packager)[tuple]{int} - scope-17
\ No newline at end of file