You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/04/22 16:59:44 UTC
svn commit: r1589152 [1/2] - in /pig/branches/tez:
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/backend/hadoop/executionengine/tez/optimiz...
Author: rohini
Date: Tue Apr 22 14:59:44 2014
New Revision: 1589152
URL: http://svn.apache.org/r1589152
Log:
PIG-3855: Turn on UnionOptimizer by default and add new e2e tests for union (rohini)
Added:
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10-OPTOFF.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java
pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
pig/branches/tez/test/e2e/pig/tests/nightly.conf
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld
pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Tue Apr 22 14:59:44 2014
@@ -719,27 +719,40 @@ public class POLocalRearrange extends Ph
protected void deepCopyTo(POLocalRearrange clone)
throws CloneNotSupportedException {
- List<PhysicalPlan> clonePlans = new ArrayList<PhysicalPlan>(
- plans.size());
- for (PhysicalPlan plan : plans) {
- clonePlans.add(plan.clone());
+
+ clone.setParentPlan(parentPlan);
+ clone.index = index;
+ if (useSecondaryKey) {
+ clone.keyType = mainKeyType;
+ } else {
+ clone.keyType = keyType;
}
+ clone.setUseSecondaryKey(useSecondaryKey);
+ // Needs to be called as setDistinct so that the fake index tuple gets
+ // created.
+ clone.setDistinct(mIsDistinct);
+ clone.setCross(isCross);
+ clone.addOriginalLocation(alias, getOriginalLocations());
+ clone.setStripKeyFromValue(stripKeyFromValue);
+
try {
- clone.setPlans(clonePlans);
+ clone.setPlans(clonePlans(plans));
+ if (secondaryPlans != null) {
+ clone.setSecondaryPlans(clonePlans(secondaryPlans));
+ }
} catch (PlanException pe) {
CloneNotSupportedException cnse = new CloneNotSupportedException("Problem with setting plans of " + this.getClass().getSimpleName());
cnse.initCause(pe);
throw cnse;
}
- clone.keyType = keyType;
- clone.mainKeyType = mainKeyType;
- clone.secondaryKeyType = secondaryKeyType;
- clone.useSecondaryKey = useSecondaryKey;
- clone.index = index;
- // Needs to be called as setDistinct so that the fake index tuple gets
- // created.
- clone.setDistinct(mIsDistinct);
- clone.addOriginalLocation(alias, getOriginalLocations());
+ }
+
+ private List<PhysicalPlan> clonePlans(List<PhysicalPlan> origPlans) throws CloneNotSupportedException {
+ List<PhysicalPlan> clonePlans = new ArrayList<PhysicalPlan>(origPlans.size());
+ for (PhysicalPlan plan : origPlans) {
+ clonePlans.add(plan.clone());
+ }
+ return clonePlans;
}
public boolean isCross() {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java Tue Apr 22 14:59:44 2014
@@ -165,12 +165,8 @@ public class MultiQueryOptimizerTez exte
parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism());
}
subPlanOper.setRequestedParallelismByReference(parentOper);
- if (subPlanOper.UDFs != null) {
- parentOper.UDFs.addAll(subPlanOper.UDFs);
- }
- if (subPlanOper.scalars != null) {
- parentOper.scalars.addAll(subPlanOper.scalars);
- }
+ parentOper.UDFs.addAll(subPlanOper.UDFs);
+ parentOper.scalars.addAll(subPlanOper.scalars);
if (subPlanOper.outEdges != null) {
for (Entry<OperatorKey, TezEdgeDescriptor> entry: subPlanOper.outEdges.entrySet()) {
parentOper.outEdges.put(entry.getKey(), entry.getValue());
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java Tue Apr 22 14:59:44 2014
@@ -39,7 +39,6 @@ import org.apache.pig.impl.io.PigNullabl
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
import com.google.common.collect.Lists;
@@ -53,7 +52,7 @@ public class POFRJoinTez extends POFRJoi
private static final long serialVersionUID = 1L;
// For replicated tables
- private List<ShuffledUnorderedKVInput> replInputs = Lists.newArrayList();
+ private List<LogicalInput> replInputs = Lists.newArrayList();
private List<KeyValueReader> replReaders = Lists.newArrayList();
private List<String> inputKeys;
private transient boolean isInputCached;
@@ -94,11 +93,8 @@ public class POFRJoinTez extends POFRJoi
try {
for (String key : inputKeys) {
LogicalInput input = inputs.get(key);
- if (input instanceof ShuffledUnorderedKVInput) {
- ShuffledUnorderedKVInput suInput = (ShuffledUnorderedKVInput) input;
- this.replInputs.add(suInput);
- this.replReaders.add((KeyValueReader) suInput.getReader());
- }
+ this.replInputs.add(input);
+ this.replReaders.add((KeyValueReader) input.getReader());
}
} catch (Exception e) {
throw new ExecException(e);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java Tue Apr 22 14:59:44 2014
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.backend.executionengine.ExecException;
@@ -47,6 +49,7 @@ import org.apache.tez.runtime.library.ap
public class POIdentityInOutTez extends POLocalRearrangeTez implements TezInput, TezOutput {
private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(POIdentityInOutTez.class);
private String inputKey;
private transient KeyValueReader reader;
private transient KeyValuesReader shuffleReader;
@@ -92,6 +95,7 @@ public class POIdentityInOutTez extends
shuffleInput = true;
shuffleReader = (KeyValuesReader) r;
}
+ LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + r);
} catch (Exception e) {
throw new ExecException(e);
}
@@ -106,6 +110,7 @@ public class POIdentityInOutTez extends
}
try {
writer = (KeyValueWriter) output.getWriter();
+ LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
} catch (Exception e) {
throw new ExecException(e);
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java Tue Apr 22 14:59:44 2014
@@ -48,7 +48,7 @@ public class POLocalRearrangeTez extends
protected String outputKey;
protected transient KeyValueWriter writer;
- protected boolean isFRJoin = false;
+ protected boolean connectedToPackage = true;
protected boolean isSkewedJoin = false;
public POLocalRearrangeTez(OperatorKey k) {
@@ -64,6 +64,7 @@ public class POLocalRearrangeTez extends
if (copy instanceof POLocalRearrangeTez) {
POLocalRearrangeTez copyTez = (POLocalRearrangeTez) copy;
this.isSkewedJoin = copyTez.isSkewedJoin;
+ this.connectedToPackage = copyTez.connectedToPackage;
this.outputKey = copyTez.outputKey;
}
}
@@ -76,12 +77,12 @@ public class POLocalRearrangeTez extends
this.outputKey = outputKey;
}
- public boolean isFRJoin() {
- return isFRJoin;
+ public boolean isConnectedToPackage() {
+ return connectedToPackage;
}
- public void setFRJoin(boolean isFRJoin) {
- this.isFRJoin = isFRJoin;
+ public void setConnectedToPackage(boolean connectedToPackage) {
+ this.connectedToPackage = connectedToPackage;
}
public boolean isSkewedJoin() {
@@ -186,6 +187,8 @@ public class POLocalRearrangeTez extends
mKey.scope, NodeIdGenerator.getGenerator().getNextNodeId(
mKey.scope)), requestedParallelism);
deepCopyTo(clone);
+ clone.isSkewedJoin = isSkewedJoin;
+ clone.connectedToPackage = connectedToPackage;
clone.setOutputKey(outputKey);
return clone;
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Tue Apr 22 14:59:44 2014
@@ -54,7 +54,7 @@ public class POShuffleTezLoad extends PO
private WritableComparator comparator = null;
private boolean isSkewedJoin = false;
-
+
private transient Configuration conf;
public POShuffleTezLoad(POPackage pack) {
@@ -125,6 +125,7 @@ public class POShuffleTezLoad extends PO
hasData = true;
cur = readers.get(i).getCurrentKey();
if (min == null || comparator.compare(min, cur) > 0) {
+ //Not a deep clone. Writable is referenced.
min = ((PigNullableWritable)cur).clone();
minIndex = i;
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java Tue Apr 22 14:59:44 2014
@@ -36,7 +36,9 @@ import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
/**
* POValueInputTez is used read tuples from a Tez Intermediate output from a 1-1
@@ -52,6 +54,9 @@ public class POValueInputTez extends Phy
// TODO Change this to value only reader after implementing
// value only input output
private transient KeyValueReader reader;
+ private transient KeyValuesReader shuffleReader;
+ private transient boolean shuffleInput;
+ private transient boolean hasNext;
protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
public POValueInputTez(OperatorKey k) {
@@ -83,9 +88,17 @@ public class POValueInputTez extends Phy
if (input == null) {
throw new ExecException("Input from vertex " + inputKey + " is missing");
}
+
try {
- reader = (KeyValueReader) input.getReader();
- LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+ Reader r = input.getReader();
+ if (r instanceof KeyValueReader) {
+ reader = (KeyValueReader) r;
+ } else {
+ shuffleInput = true;
+ shuffleReader = (KeyValuesReader) r;
+ hasNext = shuffleReader.next();
+ }
+ LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + r);
} catch (Exception e) {
throw new ExecException(e);
}
@@ -97,20 +110,30 @@ public class POValueInputTez extends Phy
if (finished) {
return RESULT_EOP;
}
- if (reader.next()) {
- Tuple origTuple = (Tuple)reader.getCurrentValue();
- Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
- return new Result(POStatus.STATUS_OK, copy);
+ if (shuffleInput) {
+ while (hasNext) {
+ if (shuffleReader.getCurrentValues().iterator().hasNext()) {
+ Tuple origTuple = (Tuple)shuffleReader.getCurrentValues().iterator().next();
+ Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+ return new Result(POStatus.STATUS_OK, copy);
+ }
+ hasNext = shuffleReader.next();
+ }
} else {
- finished = true;
- // For certain operators (such as STREAM), we could still have some work
- // to do even after seeing the last input. These operators set a flag that
- // says all input has been sent and to run the pipeline one more time.
- if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) {
- this.parentPlan.endOfAllInput = true;
+ if (reader.next()) {
+ Tuple origTuple = (Tuple)reader.getCurrentValue();
+ Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+ return new Result(POStatus.STATUS_OK, copy);
}
- return RESULT_EOP;
}
+ finished = true;
+ // For certain operators (such as STREAM), we could still have some work
+ // to do even after seeing the last input. These operators set a flag that
+ // says all input has been sent and to run the pipeline one more time.
+ if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) {
+ this.parentPlan.endOfAllInput = true;
+ }
+ return RESULT_EOP;
} catch (IOException e) {
throw new ExecException(e);
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java Tue Apr 22 14:59:44 2014
@@ -99,6 +99,10 @@ public class POValueOutputTez extends Ph
outputKeys.remove(outputKey);
}
+ public boolean containsOutputKey(String outputKey) {
+ return outputKeys.contains(outputKey);
+ }
+
@Override
public Result getNextTuple() throws ExecException {
Result inp;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Tue Apr 22 14:59:44 2014
@@ -668,7 +668,7 @@ public class TezCompiler extends PhyPlan
if (!tezOp.isClosed()) {
POLocalRearrangeTez lr = new POLocalRearrangeTez(op.getLRs()[i]);
lr.setOutputKey(curTezOp.getOperatorKey().toString());
- lr.setFRJoin(true);
+ lr.setConnectedToPackage(false);
tezOp.plan.addAsLeaf(lr);
TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, tezOp, curTezOp);
@@ -1518,8 +1518,9 @@ public class TezCompiler extends PhyPlan
POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps1,flat1);
oper.plan.addAsLeaf(nfe1);
+ String numSamples = pigContext.getProperties().getProperty(PigConfiguration.PIG_RANDOM_SAMPLER_SAMPLE_SIZE, "100");
POReservoirSample poSample = new POReservoirSample(new OperatorKey(scope,nig.getNextNodeId(scope)),
- -1, null, 100);
+ -1, null, Integer.parseInt(numSamples));
oper.plan.addAsLeaf(poSample);
lrSample.setOutputKey(curTezOp.getOperatorKey().toString());
oper.plan.addAsLeaf(lrSample);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue Apr 22 14:59:44 2014
@@ -74,6 +74,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.TezPOPackageAnnotator.LoRearrangeDiscoverer;
@@ -112,8 +113,8 @@ import org.apache.tez.mapreduce.hadoop.M
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.runtime.library.input.SortedGroupedMergedInput;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
-import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
/**
* A visitor to construct DAG out of Tez plan.
@@ -223,8 +224,9 @@ public class TezDagBuilder extends TezOp
EdgeProperty edgeProperty = newEdge(fromOp, toOp);
String groupInputClass = edgeProperty.getDataMovementType().equals(
- DataMovementType.SCATTER_GATHER) ? ConcatenatedMergedKeyValuesInput.class
- .getName() : ConcatenatedMergedKeyValueInput.class.getName();
+ DataMovementType.SCATTER_GATHER)
+ ? SortedGroupedMergedInput.class.getName()
+ : ConcatenatedMergedKeyValueInput.class.getName();
return new GroupInputEdge(from, to, edgeProperty,
new InputDescriptor(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()));
}
@@ -457,10 +459,9 @@ public class TezDagBuilder extends TezOp
LinkedList<POLocalRearrangeTez> lrs =
PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
for (POLocalRearrangeTez lr : lrs) {
- if (lr.isFRJoin()) {
- // skip FR join input
- } else if (lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
- localRearrangeMap.put((int)lr.getIndex(), inputKey);
+ if (lr.isConnectedToPackage()
+ && lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
+ localRearrangeMap.put((int) lr.getIndex(), inputKey);
}
}
}
@@ -507,12 +508,27 @@ public class TezDagBuilder extends TezOp
MRToTezHelper.convertMRToTezRuntimeConf(payloadConf, globalConf);
+ if (!pc.inIllustrator) {
+ for (POStore store : stores) {
+ // unset inputs for POStore, otherwise, map/reduce plan will be unnecessarily deserialized
+ store.setInputs(null);
+ store.setParentPlan(null);
+ }
+ // We put them in the reduce because PigOutputCommitter checks the
+ // ID of the task to see if it's a map, and if not, calls the reduce
+ // committers.
+ payloadConf.set(JobControlCompiler.PIG_MAP_STORES,
+ ObjectSerializer.serialize(new ArrayList<POStore>()));
+ payloadConf.set(JobControlCompiler.PIG_REDUCE_STORES,
+ ObjectSerializer.serialize(stores));
+ }
+
// Take our assembled configuration and create a vertex
byte[] userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
procDesc.setUserPayload(userPayload);
// Can only set parallelism here if the parallelism isn't derived from
// splits
- int parallelism = Math.max(tezOp.getRequestedParallelism(), 1);
+ int parallelism = tezOp.getRequestedParallelism();
InputSplitInfo inputSplitInfo = null;
if (loads != null && loads.size() > 0) {
// Not using MRInputAMSplitGenerator because delegation tokens are
@@ -523,6 +539,26 @@ public class TezDagBuilder extends TezOp
parallelism = inputSplitInfo.getNumTasks();
tezOp.setRequestedParallelism(parallelism);
}
+ if (tezOp.getRequestedParallelism() < 0) {
+ if (pc.defaultParallel > 0) {
+ parallelism = pc.defaultParallel;
+ } else {
+ // Rough estimation till we have Automatic Reducer Parallelism
+ // and Parallelism estimator. To be removed.
+ int sumOfPredParallelism = 0;
+ int predParallelism;
+ for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
+ predParallelism = pred.getRequestedParallelism();
+ if (predParallelism < 0) {
+ predParallelism = Math.max(pc.defaultParallel, 1);
+ }
+ sumOfPredParallelism += predParallelism;
+ }
+ sumOfPredParallelism = Math.min(sumOfPredParallelism, 20);
+ parallelism = Math.max(sumOfPredParallelism, 1);
+ }
+ tezOp.setRequestedParallelism(parallelism);
+ }
Vertex vertex = new Vertex(tezOp.getOperatorKey().toString(), procDesc, parallelism,
isMap ? MRHelpers.getMapResource(globalConf) : MRHelpers.getReduceResource(globalConf));
@@ -705,6 +741,12 @@ public class TezDagBuilder extends TezOp
JobControlCompiler.LOG_DIR).toString());
}
payloadConf.set("pig.streaming.task.output.dir", outputPathString);
+
+ if(tezOp.plan.getLeaves().get(0) instanceof POSplit) {
+ // Set this so that we get correct counters
+ st.setMultiStore(true);
+ }
+
} else { // multi store case
log.info("Setting up multi store job");
String tmpLocationStr = FileLocalizer.getTemporaryPath(pc)
@@ -729,17 +771,6 @@ public class TezDagBuilder extends TezOp
tmpLocation.toString());
}
- if (!pc.inIllustrator) {
-
- // We put them in the reduce because PigOutputCommitter checks the
- // ID of the task to see if it's a map, and if not, calls the reduce
- // committers.
- payloadConf.set(JobControlCompiler.PIG_MAP_STORES,
- ObjectSerializer.serialize(new ArrayList<POStore>()));
- payloadConf.set(JobControlCompiler.PIG_REDUCE_STORES,
- ObjectSerializer.serialize(stores));
- }
-
}
return stores;
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Tue Apr 22 14:59:44 2014
@@ -195,7 +195,7 @@ public class TezLauncher extends Launche
}
boolean isUnionOpt = "true".equalsIgnoreCase(pc.getProperties()
- .getProperty(PigConfiguration.TEZ_OPT_UNION, "false"));
+ .getProperty(PigConfiguration.TEZ_OPT_UNION, "true"));
// Use VertexGroup in Tez
if (isUnionOpt) {
UnionOptimizer uo = new UnionOptimizer(tezPlan);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Tue Apr 22 14:59:44 2014
@@ -415,6 +415,10 @@ public class TezOperator extends Operato
this.inputKeys.add(input);
}
+ public boolean removeInput(OperatorKey input) {
+ return this.inputKeys.remove(input);
+ }
+
public String getOutput() {
return outputKey;
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java Tue Apr 22 14:59:44 2014
@@ -22,10 +22,14 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.POStoreTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.POValueOutputTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.RoundRobinPartitioner;
import org.apache.pig.backend.hadoop.executionengine.tez.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.TezInput;
import org.apache.pig.backend.hadoop.executionengine.tez.TezOpPlanVisitor;
@@ -36,6 +40,9 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
/**
* Optimizes union by removing the intermediate union vertex and making the
@@ -66,6 +73,7 @@ public class UnionOptimizer extends TezO
}
TezOperator unionOp = tezOp;
+ String unionOpKey = unionOp.getOperatorKey().toString();
String scope = unionOp.getOperatorKey().scope;
TezOperPlan tezPlan = getPlan();
@@ -124,16 +132,28 @@ public class UnionOptimizer extends TezO
for (OperatorKey predKey : unionOp.getVertexGroupMembers()) {
TezOperator pred = tezPlan.getOperator(predKey);
PhysicalPlan predPlan = pred.plan;
- // Remove POValueOutputTez from predecessor leaf
- predPlan.remove(predPlan.getLeaves().get(0));
-
PhysicalOperator predLeaf = predPlan.getLeaves().get(0);
+ // if predLeaf not POValueOutputTez
+ if (predLeaf instanceof POSplit) {
+ // Find the subPlan that connects to the union operator
+ predPlan = getUnionPredPlanFromSplit(predPlan, unionOpKey);
+ predLeaf = predPlan.getLeaves().get(0);
+ }
+
PhysicalPlan clonePlan = unionOpPlan.clone();
//Clone changes the operator keys
List<POStoreTez> clonedUnionStoreOutputs = PlanHelper.getPhysicalOperators(clonePlan, POStoreTez.class);
+ // Remove POValueOutputTez from predecessor leaf
+ predPlan.remove(predLeaf);
+ boolean isEmptyPlan = predPlan.isEmpty();
+ if (!isEmptyPlan) {
+ predLeaf = predPlan.getLeaves().get(0);
+ }
predPlan.merge(clonePlan);
- predPlan.connect(predLeaf, clonePlan.getRoots().get(0));
+ if (!isEmptyPlan) {
+ predPlan.connect(predLeaf, clonePlan.getRoots().get(0));
+ }
// Connect predecessor to the storeVertexGroups
int i = 0;
@@ -153,25 +173,73 @@ public class UnionOptimizer extends TezO
tezPlan.connect(pred, outputVertexGroup);
}
+ copyOperatorProperties(pred, unionOp);
tezPlan.disconnect(pred, unionOp);
}
+ List<TezOperator> successors = tezPlan.getSuccessors(unionOp);
+ List<TezOutput> valueOnlyOutputs = new ArrayList<TezOutput>();
+ for (TezOutput tezOutput : unionOutputs) {
+ if (tezOutput instanceof POValueOutputTez) {
+ valueOnlyOutputs.add(tezOutput);
+ }
+ }
+ // Connect to outputVertexGroupOps
// Copy output edges of union -> successor to predecessor->successor, vertexgroup -> successor
// and connect vertexgroup -> successor in the plan.
for (Entry<OperatorKey, TezEdgeDescriptor> entry : unionOp.outEdges.entrySet()) {
TezOperator succOp = tezPlan.getOperator(entry.getKey());
+ // Case of union followed by union.
+ // unionOp.outEdges will not point to vertex group, but to its output.
+ // So find the vertex group if there is one.
+ TezOperator succOpVertexGroup = null;
+ for (TezOperator succ : successors) {
+ if (succ.isVertexGroup()
+ && succ.getVertexGroupInfo().getOutput()
+ .equals(succOp.getOperatorKey().toString())) {
+ succOpVertexGroup = succ;
+ break;
+ }
+ }
+ TezEdgeDescriptor edge = entry.getValue();
+ // Edge cannot be one to one as it will get input from two or
+ // more union predecessors. Change it to SCATTER_GATHER
+ if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
+ edge.dataMovementType = DataMovementType.SCATTER_GATHER;
+ edge.partitionerClass = RoundRobinPartitioner.class;
+ edge.outputClassName = OnFileSortedOutput.class.getName();
+ edge.inputClassName = ShuffledMergedInput.class.getName();
+
+ for (TezOutput tezOutput : valueOnlyOutputs) {
+ if (ArrayUtils.contains(tezOutput.getTezOutputs(), entry.getKey().toString())) {
+ edge.setIntermediateOutputKeyComparatorClass(
+ POValueOutputTez.EmptyWritableComparator.class.getName());
+ }
+ }
+ }
TezOperator vertexGroupOp = outputVertexGroupOps[unionOutputKeys.indexOf(entry.getKey().toString())];
- // Required for create the Edge in TezDAGBuilder
for (OperatorKey predKey : vertexGroupOp.getVertexGroupMembers()) {
TezOperator pred = tezPlan.getOperator(predKey);
- pred.outEdges.put(entry.getKey(), entry.getValue());
- succOp.inEdges.put(predKey, entry.getValue());
+ // Keep the output edge directly to successor
+ // Don't need to keep output edge for vertexgroup
+ pred.outEdges.put(entry.getKey(), edge);
+ succOp.inEdges.put(predKey, edge);
+ if (succOpVertexGroup != null) {
+ succOpVertexGroup.getVertexGroupMembers().add(predKey);
+ succOpVertexGroup.getVertexGroupInfo().addInput(predKey);
+ // Connect directly to the successor vertex group
+ tezPlan.disconnect(pred, vertexGroupOp);
+ tezPlan.connect(pred, succOpVertexGroup);
+ }
+ }
+ if (succOpVertexGroup != null) {
+ succOpVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
+ succOpVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
+ //Discard the new vertex group created
+ tezPlan.remove(vertexGroupOp);
+ } else {
+ tezPlan.connect(vertexGroupOp, succOp);
}
- // Not used in TezDAGBuilder. Just setting for correctness.
- vertexGroupOp.outEdges.put(entry.getKey(), entry.getValue());
- succOp.inEdges.put(vertexGroupOp.getOperatorKey(), entry.getValue());
-
- tezPlan.connect(vertexGroupOp, succOp);
}
} catch (Exception e) {
throw new VisitorException(e);
@@ -186,7 +254,7 @@ public class UnionOptimizer extends TezO
LinkedList<TezInput> inputs = PlanHelper.getPhysicalOperators(succ.plan, TezInput.class);
for (TezInput input : inputs) {
for (String key : input.getTezInputs()) {
- if (key.equals(unionOp.getOperatorKey().toString())) {
+ if (key.equals(unionOpKey)) {
input.replaceInput(key,
newOutputKeys[unionOutputKeys.indexOf(succ.getOperatorKey().toString())]);
}
@@ -201,4 +269,25 @@ public class UnionOptimizer extends TezO
}
+ private void copyOperatorProperties(TezOperator pred, TezOperator unionOp) {
+ pred.setUseSecondaryKey(unionOp.isUseSecondaryKey());
+ pred.UDFs.addAll(unionOp.UDFs);
+ pred.scalars.addAll(unionOp.scalars);
+ }
+
+ public static PhysicalPlan getUnionPredPlanFromSplit(PhysicalPlan plan, String unionOpKey) throws VisitorException {
+ List<POSplit> splits = PlanHelper.getPhysicalOperators(plan, POSplit.class);
+ for (POSplit split : splits) {
+ for (PhysicalPlan subPlan : split.getPlans()) {
+ if (subPlan.getLeaves().get(0) instanceof POValueOutputTez) {
+ POValueOutputTez out = (POValueOutputTez) subPlan.getLeaves().get(0);
+ if (out.containsOutputKey(unionOpKey)) {
+ return subPlan;
+ }
+ }
+ }
+ }
+ throw new VisitorException("Did not find the union predecessor in the split plan");
+ }
+
}
Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java Tue Apr 22 14:59:44 2014
@@ -92,14 +92,9 @@ public class TezTaskStats extends JobSta
long hdfsBytesRead = -1;
String filename = fs.getFileName();
Map<String, Long> taskCounter = map.get(TASK_COUNTER_GROUP);
- if (taskCounter != null) {
- //TaskCounter.INPUT_RECORDS_PROCESSED.name()
- if (taskCounter.get("INPUT_RECORDS_PROCESSED") != null) {
- records = taskCounter.get("INPUT_RECORDS_PROCESSED");
- } else if (taskCounter.get(PigStatsUtil.MAP_INPUT_RECORDS) != null) {
- // Tez 0.3 has MAP_INPUT_RECORDS TODO: Remove after we move away from Tez 0.3
- records = taskCounter.get(PigStatsUtil.MAP_INPUT_RECORDS);
- }
+ if (taskCounter != null
+ && taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name()) != null) {
+ records = taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name());
}
if (map.get(FS_COUNTER_GROUP) !=null &&
map.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) {
@@ -124,13 +119,9 @@ public class TezTaskStats extends JobSta
if (sto.isMultiStore()) {
Long n = map.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP).get(PigStatsUtil.getMultiStoreCounterName(sto));
if (n != null) records = n;
- } else if (map.get(TASK_COUNTER_GROUP) != null) {
- if(map.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name()) != null) {
- records = map.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name());
- } else if(map.get(TASK_COUNTER_GROUP).get(PigStatsUtil.MAP_OUTPUT_RECORDS) != null) {
- // Tez 0.3 has MAP_OUTPUT_RECORDS TODO: Remove after we move away from Tez 0.3
- records = map.get(TASK_COUNTER_GROUP).get(PigStatsUtil.MAP_OUTPUT_RECORDS);
- }
+ } else if (map.get(TASK_COUNTER_GROUP) != null
+ && map.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name()) != null) {
+ records = map.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name());
}
/*
if (map.get(FS_COUNTER_GROUP)!= null &&
Modified: pig/branches/tez/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/nightly.conf?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/nightly.conf Tue Apr 22 14:59:44 2014
@@ -1360,6 +1360,101 @@ d = foreach b generate name, age;
e = union c, d;
store e into ':OUTPATH:';\,
},
+ {
+ 'num' => 2,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = group c by name;
+e = foreach d generate group, SUM(c.age);
+store e into ':OUTPATH:';\,
+ },
+ {
+ 'num' => 3,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = group c by name;
+e = foreach d { f = order c by $1,$2; generate group, f; };
+store e into ':OUTPATH:';\,
+ },
+ {
+ 'num' => 4,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = order c by name PARALLEL 2;
+store d into ':OUTPATH:';\,
+ },
+ {
+ 'num' => 5,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+split a into a1 if age < 50, a2 otherwise;
+c = union a1, b;
+d = order c by name PARALLEL 2;
+store a2 into ':OUTPATH:.1';
+store d into ':OUTPATH:.2';\,
+ },
+ {
+ 'num' => 6,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join c by name, d by name PARALLEL 2;
+store e into ':OUTPATH:';\,
+ },
+ {
+ 'num' => 7,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join c by name, d by name using 'replicated';
+store e into ':OUTPATH:';\,
+ },
+ {
+ 'num' => 8,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by name, c by name using 'replicated';
+store e into ':OUTPATH:';\,
+ },
+ {
+ 'num' => 9,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join c by name, d by name using 'skewed' PARALLEL 5;
+store e into ':OUTPATH:';\,
+ },
+ {
+ 'num' => 10,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by name, c by name using 'skewed' PARALLEL 5;
+store e into ':OUTPATH:';\,
+ },
+ {
+ 'num' => 11,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = foreach a generate name, age;
+e = foreach b generate name, age;
+f = foreach c generate name, age;
+g = union d, e;
+h = union f, g;
+i = group h by name;
+i = foreach i generate group, SUM(h.age);
+store i into ':OUTPATH:';\,
+ },
]
},
{
Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld Tue Apr 22 14:59:44 2014
@@ -2,310 +2,307 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-412
+# TEZ DAG plan: scope-418
#--------------------------------------------------
-Tez vertex scope-316 -> Tez vertex scope-318,Tez vertex scope-329,Tez vertex scope-340,
-Tez vertex scope-318 -> Tez vertex scope-321,Tez vertex scope-334,Tez vertex scope-345,
-Tez vertex scope-345 -> Tez vertex scope-348,Tez vertex scope-372,
-Tez vertex scope-348 -> Tez vertex scope-366,Tez vertex scope-356,
-Tez vertex scope-356 -> Tez vertex scope-366,
-Tez vertex scope-366 -> Tez vertex scope-368,
-Tez vertex scope-368
-Tez vertex scope-340 -> Tez vertex scope-343,Tez vertex scope-384,
-Tez vertex scope-384 -> Tez vertex scope-388,
-Tez vertex scope-372 -> Tez vertex scope-376,
-Tez vertex scope-376 -> Tez vertex scope-382,Tez vertex scope-386,
-Tez vertex scope-386 -> Tez vertex scope-388,
-Tez vertex scope-388
-Tez vertex scope-329 -> Tez vertex scope-332,Tez vertex scope-337,
+Tez vertex scope-319 -> Tez vertex scope-321,Tez vertex scope-332,Tez vertex scope-343,
+Tez vertex scope-332 -> Tez vertex scope-335,Tez vertex scope-340,
+Tez vertex scope-335 -> Tez vertex scope-339,
+Tez vertex scope-321 -> Tez vertex scope-324,Tez vertex scope-337,Tez vertex scope-348,
Tez vertex scope-337 -> Tez vertex scope-339,
Tez vertex scope-339
-Tez vertex scope-382
-Tez vertex scope-321 -> Tez vertex scope-323,
-Tez vertex scope-323 -> Tez vertex scope-325,Tez vertex scope-327,
-Tez vertex scope-327
-Tez vertex scope-325
-Tez vertex scope-343
-Tez vertex scope-332 -> Tez vertex scope-336,
-Tez vertex scope-334 -> Tez vertex scope-336,
-Tez vertex scope-336
+Tez vertex scope-348 -> Tez vertex scope-351,Tez vertex scope-375,
+Tez vertex scope-375 -> Tez vertex scope-379,
+Tez vertex scope-379 -> Tez vertex scope-385,Tez vertex scope-389,
+Tez vertex scope-385
+Tez vertex scope-343 -> Tez vertex scope-346,Tez vertex scope-387,
+Tez vertex scope-387 -> Tez vertex scope-415,
+Tez vertex scope-389 -> Tez vertex scope-415,
+Tez vertex scope-415
+Tez vertex scope-346
+Tez vertex scope-340 -> Tez vertex scope-342,
+Tez vertex scope-342
+Tez vertex scope-324 -> Tez vertex scope-326,
+Tez vertex scope-326 -> Tez vertex scope-328,Tez vertex scope-330,
+Tez vertex scope-328
+Tez vertex scope-351 -> Tez vertex scope-369,Tez vertex scope-359,
+Tez vertex scope-359 -> Tez vertex scope-369,
+Tez vertex scope-369 -> Tez vertex scope-371,
+Tez vertex scope-371
+Tez vertex scope-330
-Tez vertex scope-316
+Tez vertex scope-319
# Plan on vertex
-POValueOutputTez - scope-317 -> [scope-340, scope-318, scope-329]
+POValueOutputTez - scope-320 -> [scope-321, scope-332, scope-343]
|
-|---a: New For Each(false,false)[bag] - scope-217
+|---a: New For Each(false,false)[bag] - scope-220
| |
- | Cast[int] - scope-212
+ | Cast[int] - scope-215
| |
- | |---Project[bytearray][0] - scope-211
+ | |---Project[bytearray][0] - scope-214
| |
- | Cast[int] - scope-215
+ | Cast[int] - scope-218
| |
- | |---Project[bytearray][1] - scope-214
+ | |---Project[bytearray][1] - scope-217
|
- |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-210
-Tez vertex scope-318
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-213
+Tez vertex scope-332
# Plan on vertex
-POValueOutputTez - scope-320 -> [scope-321, scope-345, scope-334]
+POValueOutputTez - scope-334 -> [scope-340, scope-335]
|
-|---b: Filter[bag] - scope-219
+|---c: Filter[bag] - scope-247
| |
- | Less Than or Equal[boolean] - scope-222
+ | Less Than or Equal[boolean] - scope-250
| |
- | |---Project[int][0] - scope-220
+ | |---Project[int][0] - scope-248
| |
- | |---Constant(5) - scope-221
+ | |---Constant(10) - scope-249
|
- |---POValueInputTez - scope-319 <- scope-316
-Tez vertex scope-345
+ |---POValueInputTez - scope-333 <- scope-319
+Tez vertex scope-335
# Plan on vertex
-POValueOutputTez - scope-347 -> [scope-348, scope-372]
-|
-|---POValueInputTez - scope-346 <- scope-318
-Tez vertex scope-348
-# Plan on vertex
-Local Rearrange[tuple]{tuple}(false) - scope-352 -> scope-356
+c1: Local Rearrange[tuple]{int}(false) - scope-260 -> scope-339
| |
-| Constant(DummyVal) - scope-351
+| Project[int][0] - scope-261
|
-|---ReservoirSample - scope-355
- |
- |---New For Each(false)[tuple] - scope-354
- | |
- | Project[int][0] - scope-353
- |
- |---e1: Local Rearrange[tuple]{int}(false) - scope-350 -> scope-366
- | |
- | Project[int][0] - scope-298
- |
- |---e: Filter[bag] - scope-294
- | |
- | Less Than[boolean] - scope-297
- | |
- | |---Project[int][0] - scope-295
- | |
- | |---Constant(3) - scope-296
- |
- |---POValueInputTez - scope-349 <- scope-345
-Tez vertex scope-356
+|---POValueInputTez - scope-336 <- scope-332
+Tez vertex scope-321
# Plan on vertex
-POValueOutputTez - scope-365 -> [scope-366]
+POValueOutputTez - scope-323 -> [scope-348, scope-337, scope-324]
|
-|---New For Each(false)[tuple] - scope-364
+|---b: Filter[bag] - scope-222
+ | |
+ | Less Than or Equal[boolean] - scope-225
| |
- | POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-363
+ | |---Project[int][0] - scope-223
| |
- | |---Project[tuple][*] - scope-362
+ | |---Constant(5) - scope-224
|
- |---New For Each(false,false)[tuple] - scope-361
- | |
- | Constant(1) - scope-360
- | |
- | Project[bag][1] - scope-358
- |
- |---Package(Packager)[tuple]{bytearray} - scope-357
-Tez vertex scope-366
+ |---POValueInputTez - scope-322 <- scope-319
+Tez vertex scope-337
# Plan on vertex
-POIdentityInOutTez - scope-367 <- scope-348 -> scope-368
+c1: Local Rearrange[tuple]{int}(false) - scope-262 -> scope-339
| |
-| Project[int][0] - scope-298
-Tez vertex scope-368
-# Plan on vertex
-e1: Store(file:///tmp/output/e1:org.apache.pig.builtin.PigStorage) - scope-300
+| Project[int][0] - scope-263
|
-|---New For Each(true)[tuple] - scope-371
- | |
- | Project[bag][1] - scope-370
- |
- |---Package(LitePackager)[tuple]{int} - scope-369
-Tez vertex scope-340
+|---POValueInputTez - scope-338 <- scope-321
+Tez vertex scope-339
# Plan on vertex
-POValueOutputTez - scope-342 -> [scope-384, scope-343]
+c1: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-267
|
-|---d1: Filter[bag] - scope-283
- | |
- | Equal To[boolean] - scope-286
+|---c1: New For Each(true,true)[tuple] - scope-266
| |
- | |---Project[int][0] - scope-284
+ | Project[bag][1] - scope-264
| |
- | |---Constant(5) - scope-285
+ | Project[bag][2] - scope-265
|
- |---d: Filter[bag] - scope-279
- | |
- | Greater Than[boolean] - scope-282
- | |
- | |---Project[int][0] - scope-280
- | |
- | |---Constant(10) - scope-281
- |
- |---POValueInputTez - scope-341 <- scope-316
-Tez vertex scope-384
+ |---c1: Package(Packager)[tuple]{int} - scope-259
+Tez vertex scope-348
# Plan on vertex
-POValueOutputTez - scope-390 -> [scope-388]
+POValueOutputTez - scope-350 -> [scope-375, scope-351]
|
-|---POValueInputTez - scope-385 <- scope-340
-Tez vertex scope-372
+|---POValueInputTez - scope-349 <- scope-321
+Tez vertex scope-375
# Plan on vertex
-f1: Local Rearrange[tuple]{tuple}(false) - scope-375 -> scope-376
+f1: Local Rearrange[tuple]{tuple}(false) - scope-378 -> scope-379
| |
-| Project[tuple][*] - scope-374
+| Project[tuple][*] - scope-377
|
-|---f1: Limit - scope-305
+|---f1: Limit - scope-308
|
- |---f: Filter[bag] - scope-301
+ |---f: Filter[bag] - scope-304
| |
- | Greater Than or Equal[boolean] - scope-304
+ | Greater Than or Equal[boolean] - scope-307
| |
- | |---Project[int][0] - scope-302
+ | |---Project[int][0] - scope-305
| |
- | |---Constant(3) - scope-303
+ | |---Constant(3) - scope-306
|
- |---POValueInputTez - scope-373 <- scope-345
-Tez vertex scope-376
+ |---POValueInputTez - scope-376 <- scope-348
+Tez vertex scope-379
# Plan on vertex
-POValueOutputTez - scope-381 -> [scope-386, scope-382]
+POValueOutputTez - scope-384 -> [scope-389, scope-385]
|
-|---f1: Limit - scope-380
+|---f1: Limit - scope-383
|
- |---f1: New For Each(true)[bag] - scope-379
+ |---f1: New For Each(true)[bag] - scope-382
| |
- | Project[tuple][1] - scope-378
+ | Project[tuple][1] - scope-381
|
- |---f1: Package(Packager)[tuple]{tuple} - scope-377
-Tez vertex scope-386
+ |---f1: Package(Packager)[tuple]{tuple} - scope-380
+Tez vertex scope-385
# Plan on vertex
-POValueOutputTez - scope-391 -> [scope-388]
+f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-312
|
-|---POValueInputTez - scope-387 <- scope-376
-Tez vertex scope-388
-# Plan on vertex
-f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-315
-|
-|---POShuffledValueInputTez - scope-389 <- [scope-386, scope-384]
-Tez vertex scope-329
+|---POValueInputTez - scope-386 <- scope-379
+Tez vertex scope-343
# Plan on vertex
-POValueOutputTez - scope-331 -> [scope-337, scope-332]
+POValueOutputTez - scope-345 -> [scope-387, scope-346]
|
-|---c: Filter[bag] - scope-244
+|---d1: Filter[bag] - scope-286
| |
- | Less Than or Equal[boolean] - scope-247
+ | Equal To[boolean] - scope-289
| |
- | |---Project[int][0] - scope-245
+ | |---Project[int][0] - scope-287
| |
- | |---Constant(10) - scope-246
+ | |---Constant(5) - scope-288
|
- |---POValueInputTez - scope-330 <- scope-316
-Tez vertex scope-337
+ |---d: Filter[bag] - scope-282
+ | |
+ | Greater Than[boolean] - scope-285
+ | |
+ | |---Project[int][0] - scope-283
+ | |
+ | |---Constant(10) - scope-284
+ |
+ |---POValueInputTez - scope-344 <- scope-319
+Tez vertex scope-387
# Plan on vertex
-c2: Local Rearrange[tuple]{int}(false) - scope-404 -> scope-339
+f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-416
+|
+|---POValueInputTez - scope-388 <- scope-343
+Tez vertex scope-389
+# Plan on vertex
+f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-417
+|
+|---POValueInputTez - scope-390 <- scope-379
+Tez vertex group scope-415 <- [scope-387, scope-389] -> null
+# No plan on vertex group
+Tez vertex scope-346
+# Plan on vertex
+d1: Store(file:///tmp/output/d1:org.apache.pig.builtin.PigStorage) - scope-293
+|
+|---POValueInputTez - scope-347 <- scope-343
+Tez vertex scope-340
+# Plan on vertex
+c2: Local Rearrange[tuple]{int}(false) - scope-407 -> scope-342
| |
-| Project[int][0] - scope-406
+| Project[int][0] - scope-409
|
-|---c3: New For Each(false,false)[bag] - scope-392
+|---c3: New For Each(false,false)[bag] - scope-395
| |
- | Project[int][0] - scope-393
+ | Project[int][0] - scope-396
| |
- | POUserFunc(org.apache.pig.builtin.AlgebraicMathBase$Initial)[tuple] - scope-394
+ | POUserFunc(org.apache.pig.builtin.AlgebraicMathBase$Initial)[tuple] - scope-397
| |
- | |---Project[bag][0] - scope-395
+ | |---Project[bag][0] - scope-398
| |
- | |---Project[bag][1] - scope-396
+ | |---Project[bag][1] - scope-399
|
- |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-407
+ |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-410
|
- |---POValueInputTez - scope-338 <- scope-329
-Tez vertex scope-339
-# Combine plan on edge <scope-337>
-c2: Local Rearrange[tuple]{int}(false) - scope-408 -> scope-339
+ |---POValueInputTez - scope-341 <- scope-332
+Tez vertex scope-342
+# Combine plan on edge <scope-340>
+c2: Local Rearrange[tuple]{int}(false) - scope-411 -> scope-342
| |
-| Project[int][0] - scope-410
+| Project[int][0] - scope-413
|
-|---c3: New For Each(false,false)[bag] - scope-397
+|---c3: New For Each(false,false)[bag] - scope-400
| |
- | Project[int][0] - scope-398
+ | Project[int][0] - scope-401
| |
- | POUserFunc(org.apache.pig.builtin.LongSum$Intermediate)[tuple] - scope-399
+ | POUserFunc(org.apache.pig.builtin.LongSum$Intermediate)[tuple] - scope-402
| |
- | |---Project[bag][1] - scope-400
+ | |---Project[bag][1] - scope-403
|
- |---c2: Package(CombinerPackager)[tuple]{int} - scope-403
+ |---c2: Package(CombinerPackager)[tuple]{int} - scope-406
# Plan on vertex
-c3: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-278
+c3: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-281
|
-|---c3: New For Each(false,false)[bag] - scope-277
+|---c3: New For Each(false,false)[bag] - scope-280
| |
- | Project[int][0] - scope-271
+ | Project[int][0] - scope-274
| |
- | POUserFunc(org.apache.pig.builtin.LongSum$Final)[long] - scope-275
+ | POUserFunc(org.apache.pig.builtin.LongSum$Final)[long] - scope-278
| |
- | |---Project[bag][1] - scope-401
+ | |---Project[bag][1] - scope-404
|
- |---c2: Package(CombinerPackager)[tuple]{int} - scope-268
-Tez vertex scope-382
-# Plan on vertex
-f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-309
-|
-|---POValueInputTez - scope-383 <- scope-376
-Tez vertex scope-321
+ |---c2: Package(CombinerPackager)[tuple]{int} - scope-271
+Tez vertex scope-324
# Plan on vertex
-b1: Local Rearrange[tuple]{int}(false) - scope-228 -> scope-323
+b1: Local Rearrange[tuple]{int}(false) - scope-231 -> scope-326
| |
-| Project[int][0] - scope-229
+| Project[int][0] - scope-232
|
-|---POValueInputTez - scope-322 <- scope-318
-Tez vertex scope-323
+|---POValueInputTez - scope-325 <- scope-321
+Tez vertex scope-326
# Plan on vertex
-POValueOutputTez - scope-324 -> [scope-327, scope-325]
+POValueOutputTez - scope-327 -> [scope-330, scope-328]
|
-|---b1: Package(Packager)[tuple]{int} - scope-227
-Tez vertex scope-327
+|---b1: Package(Packager)[tuple]{int} - scope-230
+Tez vertex scope-328
# Plan on vertex
-b2: Store(file:///tmp/output/b2:org.apache.pig.builtin.PigStorage) - scope-243
+b1: Store(file:///tmp/output/b1:org.apache.pig.builtin.PigStorage) - scope-236
|
-|---b2: New For Each(false,false)[bag] - scope-242
- | |
- | Project[int][0] - scope-236
- | |
- | POUserFunc(org.apache.pig.builtin.LongSum)[long] - scope-240
- | |
- | |---Project[bag][0] - scope-239
- | |
- | |---Project[bag][1] - scope-238
- |
- |---POValueInputTez - scope-328 <- scope-323
-Tez vertex scope-325
+|---POValueInputTez - scope-329 <- scope-326
+Tez vertex scope-351
# Plan on vertex
-b1: Store(file:///tmp/output/b1:org.apache.pig.builtin.PigStorage) - scope-233
+Local Rearrange[tuple]{tuple}(false) - scope-355 -> scope-359
+| |
+| Constant(DummyVal) - scope-354
|
-|---POValueInputTez - scope-326 <- scope-323
-Tez vertex scope-343
+|---ReservoirSample - scope-358
+ |
+ |---New For Each(false)[tuple] - scope-357
+ | |
+ | Project[int][0] - scope-356
+ |
+ |---e1: Local Rearrange[tuple]{int}(false) - scope-353 -> scope-369
+ | |
+ | Project[int][0] - scope-301
+ |
+ |---e: Filter[bag] - scope-297
+ | |
+ | Less Than[boolean] - scope-300
+ | |
+ | |---Project[int][0] - scope-298
+ | |
+ | |---Constant(3) - scope-299
+ |
+ |---POValueInputTez - scope-352 <- scope-348
+Tez vertex scope-359
# Plan on vertex
-d1: Store(file:///tmp/output/d1:org.apache.pig.builtin.PigStorage) - scope-290
+POValueOutputTez - scope-368 -> [scope-369]
|
-|---POValueInputTez - scope-344 <- scope-340
-Tez vertex scope-332
+|---New For Each(false)[tuple] - scope-367
+ | |
+ | POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-366
+ | |
+ | |---Project[tuple][*] - scope-365
+ |
+ |---New For Each(false,false)[tuple] - scope-364
+ | |
+ | Constant(1) - scope-363
+ | |
+ | Project[bag][1] - scope-361
+ |
+ |---Package(Packager)[tuple]{bytearray} - scope-360
+Tez vertex scope-369
# Plan on vertex
-c1: Local Rearrange[tuple]{int}(false) - scope-257 -> scope-336
+POIdentityInOutTez - scope-370 <- scope-351 -> scope-371
| |
-| Project[int][0] - scope-258
-|
-|---POValueInputTez - scope-333 <- scope-329
-Tez vertex scope-334
+| Project[int][0] - scope-301
+Tez vertex scope-371
# Plan on vertex
-c1: Local Rearrange[tuple]{int}(false) - scope-259 -> scope-336
-| |
-| Project[int][0] - scope-260
+e1: Store(file:///tmp/output/e1:org.apache.pig.builtin.PigStorage) - scope-303
|
-|---POValueInputTez - scope-335 <- scope-318
-Tez vertex scope-336
+|---New For Each(true)[tuple] - scope-374
+ | |
+ | Project[bag][1] - scope-373
+ |
+ |---Package(LitePackager)[tuple]{int} - scope-372
+Tez vertex scope-330
# Plan on vertex
-c1: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-264
+b2: Store(file:///tmp/output/b2:org.apache.pig.builtin.PigStorage) - scope-246
|
-|---c1: New For Each(true,true)[tuple] - scope-263
+|---b2: New For Each(false,false)[bag] - scope-245
| |
- | Project[bag][1] - scope-261
+ | Project[int][0] - scope-239
| |
- | Project[bag][2] - scope-262
+ | POUserFunc(org.apache.pig.builtin.LongSum)[long] - scope-243
+ | |
+ | |---Project[bag][0] - scope-242
+ | |
+ | |---Project[bag][1] - scope-241
|
- |---c1: Package(Packager)[tuple]{int} - scope-256
+ |---POValueInputTez - scope-331 <- scope-326
Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld Tue Apr 22 14:59:44 2014
@@ -2,9 +2,11 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-209
+# TEZ DAG plan: scope-212
#--------------------------------------------------
-Tez vertex scope-106 -> Tez vertex scope-119,Tez vertex scope-113,Tez vertex scope-126,Tez vertex scope-156,Tez vertex scope-146,Tez vertex scope-166,Tez vertex scope-178,
+Tez vertex scope-106 -> Tez vertex scope-119,Tez vertex scope-113,Tez vertex scope-126,Tez vertex scope-156,Tez vertex scope-146,Tez vertex scope-166,Tez vertex scope-209,
+Tez vertex scope-166 -> Tez vertex scope-209,
+Tez vertex scope-209
Tez vertex scope-113
Tez vertex scope-119 -> Tez vertex scope-126,Tez vertex scope-129,
Tez vertex scope-126
@@ -12,8 +14,6 @@ Tez vertex scope-129
Tez vertex scope-146 -> Tez vertex scope-156,
Tez vertex scope-156 -> Tez vertex scope-158,
Tez vertex scope-158
-Tez vertex scope-166 -> Tez vertex scope-178,
-Tez vertex scope-178
Tez vertex scope-106
# Plan on vertex
@@ -79,7 +79,7 @@ Tez vertex scope-106
| | |
| | d1: Store(file:///tmp/output/d1:org.apache.pig.builtin.PigStorage) - scope-80
| | |
-| | POValueOutputTez - scope-180 -> [scope-178]
+| | f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-210
| |
| |---d1: Filter[bag] - scope-73
| | |
@@ -110,6 +110,23 @@ Tez vertex scope-106
| |---Project[bytearray][1] - scope-4
|
|---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-166
+# Plan on vertex
+f1: Split - scope-203
+| |
+| f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-99
+| |
+| f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-211
+|
+|---f1: Limit - scope-170
+ |
+ |---f1: New For Each(true)[bag] - scope-169
+ | |
+ | Project[tuple][1] - scope-168
+ |
+ |---f1: Package(Packager)[tuple]{tuple} - scope-167
+Tez vertex group scope-209 <- [scope-106, scope-166] -> null
+# No plan on vertex group
Tez vertex scope-113
# Plan on vertex
b1: Split - scope-202
@@ -231,23 +248,3 @@ e1: Store(file:///tmp/output/e1:org.apac
| Project[bag][1] - scope-160
|
|---Package(LitePackager)[tuple]{int} - scope-159
-Tez vertex scope-166
-# Plan on vertex
-f1: Split - scope-203
-| |
-| f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-99
-| |
-| POValueOutputTez - scope-181 -> [scope-178]
-|
-|---f1: Limit - scope-170
- |
- |---f1: New For Each(true)[bag] - scope-169
- | |
- | Project[tuple][1] - scope-168
- |
- |---f1: Package(Packager)[tuple]{tuple} - scope-167
-Tez vertex scope-178
-# Plan on vertex
-f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-105
-|
-|---POShuffledValueInputTez - scope-179 <- [scope-106, scope-166]
Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10-OPTOFF.gld?rev=1589152&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10-OPTOFF.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10-OPTOFF.gld Tue Apr 22 14:59:44 2014
@@ -0,0 +1,75 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-43
+#--------------------------------------------------
+Tez vertex scope-31 -> Tez vertex scope-33,
+Tez vertex scope-32 -> Tez vertex scope-33,
+Tez vertex scope-33 -> Tez vertex scope-38,
+Tez vertex scope-37 -> Tez vertex scope-38,
+Tez vertex scope-38 -> Tez vertex scope-42,
+Tez vertex scope-42
+
+Tez vertex scope-31
+# Plan on vertex
+POValueOutputTez - scope-35 -> [scope-33]
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[chararray] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-32
+# Plan on vertex
+POValueOutputTez - scope-36 -> [scope-33]
+|
+|---c: New For Each(false,false)[bag] - scope-15
+ | |
+ | Cast[int] - scope-10
+ | |
+ | |---Project[bytearray][1] - scope-9
+ | |
+ | Cast[chararray] - scope-13
+ | |
+ | |---Project[bytearray][0] - scope-12
+ |
+ |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-33
+# Plan on vertex
+POValueOutputTez - scope-40 -> [scope-38]
+|
+|---POShuffledValueInputTez - scope-34 <- [scope-31, scope-32]
+Tez vertex scope-37
+# Plan on vertex
+POValueOutputTez - scope-41 -> [scope-38]
+|
+|---d: New For Each(false,false)[bag] - scope-24
+ | |
+ | Cast[int] - scope-19
+ | |
+ | |---Project[bytearray][0] - scope-18
+ | |
+ | Cast[chararray] - scope-22
+ | |
+ | |---Project[bytearray][1] - scope-21
+ |
+ |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17
+Tez vertex scope-38
+# Plan on vertex
+f: Local Rearrange[tuple]{int}(false) - scope-28 -> scope-42
+| |
+| Project[int][0] - scope-29
+|
+|---POShuffledValueInputTez - scope-39 <- [scope-37, scope-33]
+Tez vertex scope-42
+# Plan on vertex
+f: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-30
+|
+|---f: Package(Packager)[tuple]{int} - scope-27
Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld?rev=1589152&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld Tue Apr 22 14:59:44 2014
@@ -0,0 +1,70 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-53
+#--------------------------------------------------
+Tez vertex scope-37 -> Tez vertex scope-43,
+Tez vertex scope-31 -> Tez vertex scope-43,
+Tez vertex scope-32 -> Tez vertex scope-43,
+Tez vertex scope-43 -> Tez vertex scope-42,
+Tez vertex scope-42
+
+Tez vertex scope-37
+# Plan on vertex
+f: Local Rearrange[tuple]{int}(false) - scope-46 -> scope-42
+| |
+| Project[int][0] - scope-47
+|
+|---d: New For Each(false,false)[bag] - scope-24
+ | |
+ | Cast[int] - scope-19
+ | |
+ | |---Project[bytearray][0] - scope-18
+ | |
+ | Cast[chararray] - scope-22
+ | |
+ | |---Project[bytearray][1] - scope-21
+ |
+ |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17
+Tez vertex scope-31
+# Plan on vertex
+f: Local Rearrange[tuple]{int}(false) - scope-49 -> scope-42
+| |
+| Project[int][0] - scope-50
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[chararray] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-32
+# Plan on vertex
+f: Local Rearrange[tuple]{int}(false) - scope-51 -> scope-42
+| |
+| Project[int][0] - scope-52
+|
+|---c: New For Each(false,false)[bag] - scope-15
+ | |
+ | Cast[int] - scope-10
+ | |
+ | |---Project[bytearray][1] - scope-9
+ | |
+ | Cast[chararray] - scope-13
+ | |
+ | |---Project[bytearray][0] - scope-12
+ |
+ |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex group scope-43 <- [scope-37, scope-31, scope-32] -> scope-42
+# No plan on vertex group
+Tez vertex scope-42
+# Plan on vertex
+f: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-30
+|
+|---f: Package(Packager)[tuple]{int} - scope-27
Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld Tue Apr 22 14:59:44 2014
@@ -70,20 +70,6 @@ d: Local Rearrange[tuple]{int}(false) -
Tez vertex group scope-55 <- [scope-29, scope-30] -> scope-35
# No plan on vertex group
Tez vertex scope-35
-# Combine plan on edge <scope-55>
-d: Local Rearrange[tuple]{int}(false) - scope-52 -> scope-35
-| |
-| Project[int][0] - scope-54
-|
-|---e: New For Each(false,false)[bag] - scope-41
- | |
- | Project[int][0] - scope-42
- | |
- | POUserFunc(org.apache.pig.builtin.LongSum$Intermediate)[tuple] - scope-43
- | |
- | |---Project[bag][1] - scope-44
- |
- |---d: Package(CombinerPackager)[tuple]{int} - scope-47
# Combine plan on edge <scope-30>
d: Local Rearrange[tuple]{int}(false) - scope-52 -> scope-35
| |
Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld Tue Apr 22 14:59:44 2014
@@ -2,114 +2,114 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-147
+# TEZ DAG plan: scope-149
#--------------------------------------------------
-Tez vertex scope-113 -> Tez vertex scope-115,
-Tez vertex scope-114 -> Tez vertex scope-115,
-Tez vertex scope-115 -> Tez vertex scope-137,Tez vertex scope-127,
-Tez vertex scope-127 -> Tez vertex scope-137,Tez vertex scope-119,
-Tez vertex scope-137 -> Tez vertex scope-141,
-Tez vertex scope-119 -> Tez vertex scope-141,
-Tez vertex scope-141
+Tez vertex scope-115 -> Tez vertex scope-117,
+Tez vertex scope-116 -> Tez vertex scope-117,
+Tez vertex scope-117 -> Tez vertex scope-139,Tez vertex scope-129,
+Tez vertex scope-129 -> Tez vertex scope-139,Tez vertex scope-121,
+Tez vertex scope-139 -> Tez vertex scope-143,
+Tez vertex scope-121 -> Tez vertex scope-143,
+Tez vertex scope-143
-Tez vertex scope-113
+Tez vertex scope-115
# Plan on vertex
-POValueOutputTez - scope-117 -> [scope-115]
+POValueOutputTez - scope-119 -> [scope-117]
|
-|---a: New For Each(false,false)[bag] - scope-91
+|---a: New For Each(false,false)[bag] - scope-93
| |
- | Cast[int] - scope-86
+ | Cast[int] - scope-88
| |
- | |---Project[bytearray][0] - scope-85
+ | |---Project[bytearray][0] - scope-87
| |
- | Cast[chararray] - scope-89
+ | Cast[chararray] - scope-91
| |
- | |---Project[bytearray][1] - scope-88
+ | |---Project[bytearray][1] - scope-90
|
- |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-84
-Tez vertex scope-114
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-86
+Tez vertex scope-116
# Plan on vertex
-POValueOutputTez - scope-118 -> [scope-115]
+POValueOutputTez - scope-120 -> [scope-117]
|
-|---c: New For Each(false,false)[bag] - scope-99
+|---c: New For Each(false,false)[bag] - scope-101
| |
- | Cast[int] - scope-94
+ | Cast[int] - scope-96
| |
- | |---Project[bytearray][1] - scope-93
+ | |---Project[bytearray][1] - scope-95
| |
- | Cast[chararray] - scope-97
+ | Cast[chararray] - scope-99
| |
- | |---Project[bytearray][0] - scope-96
+ | |---Project[bytearray][0] - scope-98
|
- |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-92
-Tez vertex scope-115
+ |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-94
+Tez vertex scope-117
# Plan on vertex
-Local Rearrange[tuple]{tuple}(false) - scope-122 -> scope-127
+Local Rearrange[tuple]{tuple}(false) - scope-124 -> scope-129
| |
-| Constant(DummyVal) - scope-121
+| Constant(DummyVal) - scope-123
|
-|---New For Each(true,true)[tuple] - scope-126
+|---New For Each(true,true)[tuple] - scope-128
| |
- | Project[int][0] - scope-109
+ | Project[int][0] - scope-111
| |
- | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-125
+ | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-127
| |
- | |---Project[tuple][*] - scope-124
+ | |---Project[tuple][*] - scope-126
|
- |---PoissonSample - scope-123
+ |---PoissonSample - scope-125
|
- |---Local Rearrange[tuple]{int}(false) - scope-120 -> scope-137
+ |---Local Rearrange[tuple]{int}(false) - scope-122 -> scope-139
| |
- | Project[int][0] - scope-109
+ | Project[int][0] - scope-111
|
- |---POShuffledValueInputTez - scope-116 <- [scope-114, scope-113]
-Tez vertex scope-127
+ |---POShuffledValueInputTez - scope-118 <- [scope-116, scope-115]
+Tez vertex scope-129
# Plan on vertex
-POValueOutputTez - scope-136 -> [scope-137, scope-119]
+POValueOutputTez - scope-138 -> [scope-139, scope-121]
|
-|---New For Each(false)[tuple] - scope-135
+|---New For Each(false)[tuple] - scope-137
| |
- | POUserFunc(org.apache.pig.impl.builtin.PartitionSkewedKeys)[tuple] - scope-134
+ | POUserFunc(org.apache.pig.impl.builtin.PartitionSkewedKeys)[tuple] - scope-136
| |
- | |---Project[tuple][*] - scope-133
+ | |---Project[tuple][*] - scope-135
|
- |---New For Each(false,false)[tuple] - scope-132
+ |---New For Each(false,false)[tuple] - scope-134
| |
- | Constant(1) - scope-131
+ | Constant(1) - scope-133
| |
- | Project[bag][1] - scope-129
+ | Project[bag][1] - scope-131
|
- |---Package(Packager)[tuple]{bytearray} - scope-128
-Tez vertex scope-137
+ |---Package(Packager)[tuple]{bytearray} - scope-130
+Tez vertex scope-139
# Plan on vertex
-POIdentityInOutTez - scope-138 <- scope-115 -> scope-141
+POIdentityInOutTez - scope-140 <- scope-117 -> scope-143
| |
-| Project[int][0] - scope-109
-Tez vertex scope-119
+| Project[int][0] - scope-111
+Tez vertex scope-121
# Plan on vertex
-Partition Rearrange[tuple]{int}(false) - scope-139 -> scope-141
+Partition Rearrange[tuple]{int}(false) - scope-141 -> scope-143
| |
-| Project[int][0] - scope-110
+| Project[int][0] - scope-112
|
-|---d: New For Each(false,false)[bag] - scope-108
+|---d: New For Each(false,false)[bag] - scope-110
| |
- | Cast[int] - scope-103
+ | Cast[int] - scope-105
| |
- | |---Project[bytearray][0] - scope-102
+ | |---Project[bytearray][0] - scope-104
| |
- | Cast[chararray] - scope-106
+ | Cast[chararray] - scope-108
| |
- | |---Project[bytearray][1] - scope-105
+ | |---Project[bytearray][1] - scope-107
|
- |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-101
-Tez vertex scope-141
+ |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-103
+Tez vertex scope-143
# Plan on vertex
-e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-112
+e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-114
|
-|---New For Each(true,true)[tuple] - scope-145
+|---New For Each(true,true)[tuple] - scope-147
| |
- | Project[bag][1] - scope-143
+ | Project[bag][1] - scope-145
| |
- | Project[bag][2] - scope-144
+ | Project[bag][2] - scope-146
|
- |---Package(Packager)[tuple]{int} - scope-142
+ |---Package(Packager)[tuple]{int} - scope-144
Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld Tue Apr 22 14:59:44 2014
@@ -2,7 +2,7 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-83
+# TEZ DAG plan: scope-85
#--------------------------------------------------
Tez vertex scope-29 -> Tez vertex scope-63,Tez vertex scope-64,
Tez vertex scope-30 -> Tez vertex scope-63,Tez vertex scope-64,
@@ -19,15 +19,15 @@ Local Rearrange[tuple]{tuple}(false) - s
| |
| Constant(DummyVal) - scope-68
|
-|---New For Each(true,true)[tuple] - scope-73
+|---New For Each(true,true)[tuple] - scope-74
| |
- | Project[int][0] - scope-70
+ | Project[int][0] - scope-71
| |
- | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-71
+ | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-72
| |
- | |---Project[tuple][*] - scope-72
+ | |---Project[tuple][*] - scope-73
|
- |---PoissonSample - scope-69
+ |---PoissonSample - scope-70
|
|---Local Rearrange[tuple]{int}(false) - scope-65 -> scope-53
| |
@@ -46,23 +46,23 @@ Local Rearrange[tuple]{tuple}(false) - s
|---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
Tez vertex scope-30
# Plan on vertex
-Local Rearrange[tuple]{tuple}(false) - scope-76 -> scope-43
+Local Rearrange[tuple]{tuple}(false) - scope-77 -> scope-43
| |
-| Constant(DummyVal) - scope-77
+| Constant(DummyVal) - scope-78
|
-|---New For Each(true,true)[tuple] - scope-82
+|---New For Each(true,true)[tuple] - scope-84
| |
- | Project[int][0] - scope-79
+ | Project[int][0] - scope-81
| |
- | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-80
+ | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-82
| |
- | |---Project[tuple][*] - scope-81
+ | |---Project[tuple][*] - scope-83
|
- |---PoissonSample - scope-78
+ |---PoissonSample - scope-80
|
- |---Local Rearrange[tuple]{int}(false) - scope-74 -> scope-53
+ |---Local Rearrange[tuple]{int}(false) - scope-75 -> scope-53
| |
- | Project[int][0] - scope-75
+ | Project[int][0] - scope-76
|
|---c: New For Each(false,false)[bag] - scope-15
| |