You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/04/01 21:49:10 UTC
svn commit: r1583768 [1/2] - in /pig/branches/tez: src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/backend/hadoop/executi...
Author: rohini
Date: Tue Apr 1 19:49:09 2014
New Revision: 1583768
URL: http://svn.apache.org/r1583768
Log:
PIG-3835: Improve performance of union (rohini)
Added:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezInput.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POShuffledValueInputTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2-OPTOFF.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld
Removed:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLoad.java
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld
Modified:
pig/branches/tez/src/org/apache/pig/PigConfiguration.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/RoundRobinPartitioner.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOutput.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC21.gld
pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
Modified: pig/branches/tez/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigConfiguration.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigConfiguration.java Tue Apr 1 19:49:09 2014
@@ -81,6 +81,11 @@ public class PigConfiguration {
public static final String OPT_ACCUMULATOR = "opt.accumulator";
/**
+ * This key is used to enable union optimization.
+ */
+ public static final String TEZ_OPT_UNION = "pig.tez.opt.union";
+
+ /**
* This key is used to define whether to reuse AM in Tez jobs.
*/
public static final String TEZ_SESSION_REUSE = "pig.tez.session.reuse";
@@ -186,7 +191,7 @@ public class PigConfiguration {
* of nested distinct or sort
*/
public static final String PIG_EXEC_NO_SECONDARY_KEY = "pig.exec.nosecondarykey";
-
+
/**
* This key used to control the sample size of RandomeSampleLoader for
* order-by. The default value is 100 rows per task.
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java Tue Apr 1 19:49:09 2014
@@ -34,13 +34,13 @@ import org.apache.pig.pen.util.ExampleTu
public class POLimit extends PhysicalOperator {
/**
- *
+ *
*/
private static final long serialVersionUID = 1L;
// Counts for outputs processed
private long soFar = 0;
-
+
// Number of limited outputs
long mLimit;
@@ -62,11 +62,11 @@ public class POLimit extends PhysicalOpe
public POLimit(OperatorKey k, int rp, List<PhysicalOperator> inputs) {
super(k, rp, inputs);
}
-
+
public void setLimit(long limit) {
mLimit = limit;
}
-
+
public long getLimit() {
return mLimit;
}
@@ -80,8 +80,8 @@ public class POLimit extends PhysicalOpe
}
/**
- * Counts the number of tuples processed into static variable soFar, if the number of tuples processed reach the
- * limit, return EOP; Otherwise, return the tuple
+ * Counts the number of tuples processed into static variable soFar, if the number of tuples processed reach the
+ * limit, return EOP; Otherwise, return the tuple
*/
@Override
public Result getNextTuple() throws ExecException {
@@ -117,12 +117,12 @@ public class POLimit extends PhysicalOpe
inp = processInput();
if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
break;
-
+
illustratorMarkup(inp.result, null, 0);
// illustrator ignore LIMIT before the post processing
if ((illustrator == null || illustrator.getOriginalLimit() != -1) && soFar>=mLimit)
inp.returnStatus = POStatus.STATUS_EOP;
-
+
soFar++;
break;
}
@@ -161,11 +161,13 @@ public class POLimit extends PhysicalOpe
NodeIdGenerator.getGenerator().getNextNodeId(this.mKey.scope)),
this.requestedParallelism, this.inputs);
newLimit.mLimit = this.mLimit;
- newLimit.expressionPlan = this.expressionPlan.clone();
+ if (this.expressionPlan != null) {
+ newLimit.expressionPlan = this.expressionPlan.clone();
+ }
newLimit.addOriginalLocation(alias, getOriginalLocations());
return newLimit;
}
-
+
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
if(illustrator != null) {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Tue Apr 1 19:49:09 2014
@@ -24,7 +24,7 @@ public class Packager implements Seriali
protected DataBag[] bags;
public static enum PackageType {
- GROUP, JOIN, UNION
+ GROUP, JOIN
};
// The key being worked on
@@ -273,6 +273,7 @@ public class Packager implements Seriali
this.numInputs = numInputs;
}
+ @Override
public Packager clone() throws CloneNotSupportedException {
Packager clone = (Packager) super.clone();
clone.setNumInputs(numInputs);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java Tue Apr 1 19:49:09 2014
@@ -18,13 +18,11 @@
package org.apache.pig.backend.hadoop.executionengine.tez;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -155,17 +153,20 @@ public class MultiQueryOptimizerTez exte
plan.disconnect(splittee, succTezOperator);
TezCompilerUtil.connect(plan, splitter, succTezOperator, edge);
- for (TezOperator succ : succs) {
- try {
- List<POFRJoinTez> frJoins = PlanHelper.getPhysicalOperators(succ.plan, POFRJoinTez.class);
- for (POFRJoinTez frJoin : frJoins) {
- if (frJoin.getInputKeys().contains(splittee.getOperatorKey().toString())) {
- frJoin.getInputKeys().set(frJoin.getInputKeys().indexOf(splittee.getOperatorKey().toString()),
- splitter.getOperatorKey().toString());
- }
- }
- } catch (VisitorException e) {
- throw new PlanException(e);
+ try {
+ List<TezInput> inputs = PlanHelper.getPhysicalOperators(succTezOperator.plan, TezInput.class);
+ for (TezInput input : inputs) {
+ input.replaceInput(splittee.getOperatorKey().toString(),
+ splitter.getOperatorKey().toString());
+ }
+ } catch (VisitorException e) {
+ throw new PlanException(e);
+ }
+
+ if (succTezOperator.isUnion()) {
+ int index = succTezOperator.getUnionPredecessors().indexOf(splittee.getOperatorKey());
+ if (index > -1) {
+ succTezOperator.getUnionPredecessors().set(index, splitter.getOperatorKey());
}
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java Tue Apr 1 19:49:09 2014
@@ -47,7 +47,7 @@ import com.google.common.collect.Lists;
* POFRJoinTez is used on the backend to load replicated table from Tez
* ShuffleUnorderedKVInput and load fragmented table from data pipeline.
*/
-public class POFRJoinTez extends POFRJoin implements TezLoad {
+public class POFRJoinTez extends POFRJoin implements TezInput {
private static final Log log = LogFactory.getLog(POFRJoinTez.class);
private static final long serialVersionUID = 1L;
@@ -64,6 +64,18 @@ public class POFRJoinTez extends POFRJoi
}
@Override
+ public String[] getTezInputs() {
+ return inputKeys.toArray(new String[inputKeys.size()]);
+ }
+
+ @Override
+ public void replaceInput(String oldInputKey, String newInputKey) {
+ if (inputKeys.remove(oldInputKey)) {
+ inputKeys.add(newInputKey);
+ }
+ }
+
+ @Override
public void addInputsToSkip(Set<String> inputsToSkip) {
String cacheKey = "replicatemap-" + getOperatorKey().toString();
Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
@@ -73,7 +85,6 @@ public class POFRJoinTez extends POFRJoi
}
}
- @SuppressWarnings("rawtypes")
@Override
public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf)
throws ExecException {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java Tue Apr 1 19:49:09 2014
@@ -32,10 +32,10 @@ import org.apache.pig.impl.io.PigNullabl
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.api.KeyValuesReader;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
/**
* POIdentityInOutTez is used to pass through tuples as is to next vertex from
@@ -44,7 +44,7 @@ import org.apache.tez.runtime.library.in
* previous vertex data uses POIdentityInOutTez.
*/
@InterfaceAudience.Private
-public class POIdentityInOutTez extends POLocalRearrangeTez implements TezLoad, TezOutput {
+public class POIdentityInOutTez extends POLocalRearrangeTez implements TezInput, TezOutput {
private static final long serialVersionUID = 1L;
private String inputKey;
@@ -62,6 +62,18 @@ public class POIdentityInOutTez extends
}
@Override
+ public String[] getTezInputs() {
+ return new String[] { inputKey };
+ }
+
+ @Override
+ public void replaceInput(String oldInputKey, String newInputKey) {
+ if (oldInputKey.equals(inputKey)) {
+ inputKey = newInputKey;
+ }
+ }
+
+ @Override
public void addInputsToSkip(Set<String> inputsToSkip) {
}
@@ -73,12 +85,12 @@ public class POIdentityInOutTez extends
throw new ExecException("Input from vertex " + inputKey + " is missing");
}
try {
- if (input instanceof ShuffledMergedInput) {
- shuffleInput = true;
- ShuffledMergedInput smInput = (ShuffledMergedInput) input;
- shuffleReader = smInput.getReader();
+ Reader r = input.getReader();
+ if (r instanceof KeyValueReader) {
+ reader = (KeyValueReader) r;
} else {
- reader = (KeyValueReader) input.getReader();
+ shuffleInput = true;
+ shuffleReader = (KeyValuesReader) r;
}
} catch (Exception e) {
throw new ExecException(e);
@@ -141,7 +153,7 @@ public class POIdentityInOutTez extends
@Override
public String name() {
- return "POIdentityInOutTez - " + mKey.toString() + "\t->\t " + outputKey;
+ return "POIdentityInOutTez - " + mKey.toString() + "\t<-\t " + inputKey + "\t->\t " + outputKey;
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java Tue Apr 1 19:49:09 2014
@@ -48,8 +48,6 @@ public class POLocalRearrangeTez extends
protected String outputKey;
protected transient KeyValueWriter writer;
- // Tez union is implemented as LR + Pkg
- protected boolean isUnion = false;
protected boolean isSkewedJoin = false;
public POLocalRearrangeTez(OperatorKey k) {
@@ -64,7 +62,6 @@ public class POLocalRearrangeTez extends
super(copy);
if (copy instanceof POLocalRearrangeTez) {
POLocalRearrangeTez copyTez = (POLocalRearrangeTez) copy;
- this.isUnion = copyTez.isUnion;
this.isSkewedJoin = copyTez.isSkewedJoin;
this.outputKey = copyTez.outputKey;
}
@@ -78,14 +75,6 @@ public class POLocalRearrangeTez extends
this.outputKey = outputKey;
}
- public boolean isUnion() {
- return isUnion;
- }
-
- public void setUnion(boolean isUnion) {
- this.isUnion = isUnion;
- }
-
public boolean isSkewedJoin() {
return isSkewedJoin;
}
@@ -95,6 +84,18 @@ public class POLocalRearrangeTez extends
}
@Override
+ public String[] getTezOutputs() {
+ return new String[] { outputKey };
+ }
+
+ @Override
+ public void replaceOutput(String oldOutputKey, String newOutputKey) {
+ if (oldOutputKey.equals(outputKey)) {
+ outputKey = newOutputKey;
+ }
+ }
+
+ @Override
public void attachOutputs(Map<String, LogicalOutput> outputs,
Configuration conf) throws ExecException {
LogicalOutput logicalOut = outputs.get(outputKey);
@@ -136,18 +137,9 @@ public class POLocalRearrangeTez extends
case POStatus.STATUS_OK:
if (illustrator == null) {
Tuple result = (Tuple) res.result;
- Byte index = (Byte)result.get(0);
- PigNullableWritable key = null;
- NullableTuple val = null;
- if (isUnion) {
- // Use the whole tuple as key and set value to null
- key = HDataType.getWritableComparableTypes(result.get(1), keyType);
- val = new NullableTuple();
- val.setNull(true);
- } else {
- key = HDataType.getWritableComparableTypes(result.get(1), keyType);
- val = new NullableTuple((Tuple)result.get(2));
- }
+ Byte index = (Byte) result.get(0);
+ PigNullableWritable key = HDataType.getWritableComparableTypes(result.get(1), keyType);
+ NullableTuple val = new NullableTuple((Tuple)result.get(2));
// Both the key and the value need the index. The key needs it so
// that it can be sorted on the index in addition to the key
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Tue Apr 1 19:49:09 2014
@@ -39,14 +39,13 @@ import org.apache.pig.impl.io.PigNullabl
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
-public class POShuffleTezLoad extends POPackage implements TezLoad {
+public class POShuffleTezLoad extends POPackage implements TezInput {
private static final long serialVersionUID = 1L;
protected List<String> inputKeys = new ArrayList<String>();
- protected List<ShuffledMergedInput> inputs = new ArrayList<ShuffledMergedInput>();
+ protected List<LogicalInput> inputs = new ArrayList<LogicalInput>();
protected List<KeyValuesReader> readers = new ArrayList<KeyValuesReader>();
private boolean[] finished;
@@ -60,6 +59,18 @@ public class POShuffleTezLoad extends PO
}
@Override
+ public String[] getTezInputs() {
+ return inputKeys.toArray(new String[inputKeys.size()]);
+ }
+
+ @Override
+ public void replaceInput(String oldInputKey, String newInputKey) {
+ if (inputKeys.remove(oldInputKey)) {
+ inputKeys.add(newInputKey);
+ }
+ }
+
+ @Override
public void addInputsToSkip(Set<String> inputsToSkip) {
}
@@ -71,11 +82,8 @@ public class POShuffleTezLoad extends PO
try {
for (String key : inputKeys) {
LogicalInput input = inputs.get(key);
- if (input instanceof ShuffledMergedInput) {
- ShuffledMergedInput smInput = (ShuffledMergedInput) input;
- this.inputs.add(smInput);
- this.readers.add(smInput.getReader());
- }
+ this.inputs.add(input);
+ this.readers.add((KeyValuesReader)input.getReader());
}
// We need to adjust numInputs because it's possible for both
@@ -93,7 +101,7 @@ public class POShuffleTezLoad extends PO
for (int i = 0; i < numInputs; i++) {
finished[i] = !readers.get(i).next();
}
- } catch (IOException e) {
+ } catch (Exception e) {
throw new ExecException(e);
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java Tue Apr 1 19:49:09 2014
@@ -36,7 +36,7 @@ import org.apache.tez.runtime.library.ap
/**
* POSimpleTezLoad is used on the backend to read tuples from a Tez MRInput
*/
-public class POSimpleTezLoad extends POLoad implements TezLoad {
+public class POSimpleTezLoad extends POLoad implements TezInput {
private static final long serialVersionUID = 1L;
private String inputKey;
@@ -48,6 +48,18 @@ public class POSimpleTezLoad extends POL
}
@Override
+ public String[] getTezInputs() {
+ return new String[] { inputKey };
+ }
+
+ @Override
+ public void replaceInput(String oldInputKey, String newInputKey) {
+ if (oldInputKey.equals(inputKey)) {
+ inputKey = newInputKey;
+ }
+ }
+
+ @Override
public void addInputsToSkip(Set<String> inputsToSkip) {
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java Tue Apr 1 19:49:09 2014
@@ -39,20 +39,40 @@ public class POStoreTez extends POStore
private static final long serialVersionUID = 1L;
private transient MROutput output;
private transient KeyValueWriter writer;
+ private String outputKey;
public POStoreTez(OperatorKey k) {
super(k);
+ this.outputKey = k.toString();
}
public POStoreTez(POStore copy) {
super(copy);
+ this.outputKey = copy.getOperatorKey().toString();
+ }
+
+ public String getOutputKey() {
+ return outputKey;
+ }
+
+ public void setOutputKey(String outputKey) {
+ this.outputKey = outputKey;
+ }
+
+ @Override
+ public String[] getTezOutputs() {
+ return new String[] { outputKey };
+ }
+
+ @Override
+ public void replaceOutput(String oldOutputKey, String newOutputKey) {
}
@Override
public void attachOutputs(Map<String, LogicalOutput> outputs,
Configuration conf)
throws ExecException {
- LogicalOutput logicalOut = outputs.get(getOperatorKey().toString());
+ LogicalOutput logicalOut = outputs.get(outputKey);
if (logicalOut == null || !(logicalOut instanceof MROutput)) {
throw new ExecException("POStoreTez only accepts MROutput. key = "
+ getOperatorKey() + ", outputs = " + outputs);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java Tue Apr 1 19:49:09 2014
@@ -41,7 +41,7 @@ import org.apache.tez.runtime.library.ap
* POValueInputTez is used read tuples from a Tez Intermediate output from a 1-1
* edge
*/
-public class POValueInputTez extends PhysicalOperator implements TezLoad {
+public class POValueInputTez extends PhysicalOperator implements TezInput {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(POValueInputTez.class);
@@ -57,6 +57,18 @@ public class POValueInputTez extends Phy
}
@Override
+ public String[] getTezInputs() {
+ return new String[] { inputKey };
+ }
+
+ @Override
+ public void replaceInput(String oldInputKey, String newInputKey) {
+ if (oldInputKey.equals(inputKey)) {
+ inputKey = newInputKey;
+ }
+ }
+
+ @Override
public void addInputsToSkip(Set<String> inputsToSkip) {
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java Tue Apr 1 19:49:09 2014
@@ -29,6 +29,7 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -59,6 +60,18 @@ public class POValueOutputTez extends Ph
}
@Override
+ public String[] getTezOutputs() {
+ return outputKeys.toArray(new String[outputKeys.size()]);
+ }
+
+ @Override
+ public void replaceOutput(String oldOutputKey, String newOutputKey) {
+ if (outputKeys.remove(oldOutputKey)) {
+ outputKeys.add(oldOutputKey);
+ }
+ }
+
+ @Override
public void attachOutputs(Map<String, LogicalOutput> outputs,
Configuration conf) throws ExecException {
writers = new ArrayList<KeyValueWriter>();
@@ -147,4 +160,24 @@ public class POValueOutputTez extends Ph
}
}
+ //TODO: Remove after PIG-3775/TEZ-661
+ public static class EmptyWritableComparator implements RawComparator<EmptyWritable> {
+
+ @Override
+ public int compare(EmptyWritable o1, EmptyWritable o2) {
+ return 0;
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ // 0 - Reverses the input order. 0 groups all values into
+ // single record on reducer which is additional overhead.
+ // -1, 1 - Returns input in random order. But comparator is invoked way more
+ // times than 0. Compared to 1, -1 invokes comparator even more.
+ // Going with 0 for now. After TEZ-661 this will not be required any more.
+ return 0;
+ }
+
+ }
+
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Tue Apr 1 19:49:09 2014
@@ -40,8 +40,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
-import org.apache.pig.data.BinSedesTuple;
import org.apache.pig.data.SchemaTupleBackend;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
@@ -176,9 +176,9 @@ public class PigProcessor implements Log
inputsToSkip.add(sampleVertex);
}
- LinkedList<TezLoad> tezLds = PlanHelper.getPhysicalOperators(execPlan, TezLoad.class);
- for (TezLoad tezLd : tezLds){
- tezLd.addInputsToSkip(inputsToSkip);
+ LinkedList<TezInput> tezInputs = PlanHelper.getPhysicalOperators(execPlan, TezInput.class);
+ for (TezInput tezInput : tezInputs){
+ tezInput.addInputsToSkip(inputsToSkip);
}
LinkedList<ReadScalarsTez> scalarInputs = new LinkedList<ReadScalarsTez>();
@@ -201,8 +201,8 @@ public class PigProcessor implements Log
}
}
- for (TezLoad tezLd : tezLds){
- tezLd.attachInputs(inputs, conf);
+ for (TezInput tezInput : tezInputs){
+ tezInput.attachInputs(inputs, conf);
}
for (ReadScalarsTez scalarInput: scalarInputs) {
@@ -272,7 +272,7 @@ public class PigProcessor implements Log
Object val = reader.getCurrentValue();
if (val != null) {
// Sample is not empty
- BinSedesTuple t = (BinSedesTuple) val;
+ Tuple t = (Tuple) val;
sampleMap = (Map<String, Object>) t.get(0);
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java Tue Apr 1 19:49:09 2014
@@ -30,7 +30,7 @@ import org.apache.pig.data.Tuple;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.library.api.KeyValueReader;
-public class ReadScalarsTez extends EvalFunc<Object> implements TezLoad {
+public class ReadScalarsTez extends EvalFunc<Object> implements TezInput {
private static final Log LOG = LogFactory.getLog(ReadScalarsTez.class);
private String inputKey;
private transient Tuple t;
@@ -41,6 +41,18 @@ public class ReadScalarsTez extends Eval
}
@Override
+ public String[] getTezInputs() {
+ return new String[] { inputKey };
+ }
+
+ @Override
+ public void replaceInput(String oldInputKey, String newInputKey) {
+ if (oldInputKey.equals(inputKey)) {
+ inputKey = newInputKey;
+ }
+ }
+
+ @Override
public void addInputsToSkip(Set<String> inputsToSkip) {
String cacheKey = "scalar-" + inputKey;
Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/RoundRobinPartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/RoundRobinPartitioner.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/RoundRobinPartitioner.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/RoundRobinPartitioner.java Tue Apr 1 19:49:09 2014
@@ -19,13 +19,12 @@ package org.apache.pig.backend.hadoop.ex
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.pig.impl.io.PigNullableWritable;
-public class RoundRobinPartitioner extends Partitioner<PigNullableWritable, Writable> {
+public class RoundRobinPartitioner extends Partitioner<Writable, Writable> {
private int num = 0;
@Override
- public int getPartition(PigNullableWritable key, Writable value, int numPartitions) {
+ public int getPartition(Writable key, Writable value, int numPartitions) {
num = ++num % numPartitions;
return num;
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Tue Apr 1 19:49:09 2014
@@ -83,8 +83,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.operators.POCounterStatsTez;
import org.apache.pig.backend.hadoop.executionengine.tez.operators.POCounterTez;
import org.apache.pig.backend.hadoop.executionengine.tez.operators.PORankTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.operators.POShuffledValueInputTez;
import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
-import org.apache.pig.data.BinSedesTuple;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.DefaultIndexableLoader;
@@ -278,9 +278,7 @@ public class TezCompiler extends PhyPlan
TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, from, tezOp);
//TODO shared edge once support is available in Tez
- edge.dataMovementType = DataMovementType.BROADCAST;
- edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
- edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+ TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
}
}
}
@@ -353,15 +351,11 @@ public class TezCompiler extends PhyPlan
// Connect splitter to splittee
TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, storeTezOper, storeOnlyTezOperator);
- edge.dataMovementType = DataMovementType.ONE_TO_ONE;
- edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
- edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+ TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.ONE_TO_ONE);
storeOnlyTezOperator.setRequestedParallelismByReference(storeTezOper);
edge = TezCompilerUtil.connect(tezPlan, storeTezOper, curTezOp);
- edge.dataMovementType = DataMovementType.ONE_TO_ONE;
- edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
- edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+ TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.ONE_TO_ONE);
curTezOp.setRequestedParallelismByReference(storeTezOper);
return;
@@ -1073,9 +1067,7 @@ public class TezCompiler extends PhyPlan
try{
nonBlocking(op);
phyToTezOpMap.put(op, curTezOp);
- if (op.getPkgr().getPackageType() == PackageType.UNION) {
- curTezOp.markUnion();
- } else if (op.getPkgr().getPackageType() == PackageType.JOIN) {
+ if (op.getPkgr().getPackageType() == PackageType.JOIN) {
curTezOp.markRegularJoin();
} else if (op.getPkgr().getPackageType() == PackageType.GROUP) {
if (op.getNumInps() == 1) {
@@ -1141,11 +1133,7 @@ public class TezCompiler extends PhyPlan
// Connect counterOper vertex to rankOper vertex by 1-1 edge
rankOper.setRequestedParallelismByReference(counterOper);
TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, counterOper, rankOper);
- edge.dataMovementType = DataMovementType.ONE_TO_ONE;
- edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
- edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
- edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName());
- edge.setIntermediateOutputValueClass(BinSedesTuple.class.getName());
+ TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.ONE_TO_ONE);
counterTez.setTuplesOutputKey(rankOper.getOperatorKey().toString());
rankTez.setTuplesInputKey(counterOper.getOperatorKey().toString());
@@ -1161,12 +1149,8 @@ public class TezCompiler extends PhyPlan
// Connect statsOper vertex to rankOper vertex by Broadcast edge
edge = TezCompilerUtil.connect(tezPlan, statsOper, rankOper);
- edge.dataMovementType = DataMovementType.BROADCAST;
- edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
- edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
- edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName());
- // Map of task id, offset count based on total number of records
- edge.setIntermediateOutputValueClass(BinSedesTuple.class.getName());
+ // Map of task id, offset count based on total number of records is in the value
+ TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
counterStatsTez.setOutputKey(rankOper.getOperatorKey().toString());
rankTez.setStatsInputKey(statsOper.getOperatorKey().toString());
@@ -1409,9 +1393,7 @@ public class TezCompiler extends PhyPlan
// Configure broadcast edges for distribution map
edge = TezCompilerUtil.connect(tezPlan, sampleJobPair.first, joinJobs[i]);
- edge.dataMovementType = DataMovementType.BROADCAST;
- edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
- edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+ TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
// Configure skewed partitioner for join
@@ -1953,9 +1935,7 @@ public class TezCompiler extends PhyPlan
lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString());
edge = TezCompilerUtil.connect(tezPlan, quantJobParallelismPair.first, sortOpers[0]);
- edge.dataMovementType = DataMovementType.BROADCAST;
- edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
- edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+ TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
POValueOutputTez sampleOut = (POValueOutputTez)quantJobParallelismPair.first.plan.getLeaves().get(0);
sampleOut.addOutputKey(sortOpers[0].getOperatorKey().toString());
sortOpers[0].sampleOperator = quantJobParallelismPair.first;
@@ -1998,9 +1978,7 @@ public class TezCompiler extends PhyPlan
output.addOutputKey(curTezOp.getOperatorKey().toString());
TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, splitOp, curTezOp);
//TODO shared edge once support is available in Tez
- edge.dataMovementType = DataMovementType.ONE_TO_ONE;
- edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
- edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+ TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.ONE_TO_ONE);
curTezOp.setRequestedParallelismByReference(splitOp);
POValueInputTez input = new POValueInputTez(OperatorKey.genOpKey(scope));
input.setInputKey(splitOp.getOperatorKey().toString());
@@ -2044,45 +2022,31 @@ public class TezCompiler extends PhyPlan
@Override
public void visitUnion(POUnion op) throws VisitorException {
try {
- // Add alias vertex. This will be converted to VertexGroup by
- // TezDagBuilder.
- TezOperator newTezOp = getTezOp();
- tezPlan.add(newTezOp);
- POLocalRearrangeTez[] outputs = new POLocalRearrangeTez[compiledInputs.length];
+ // Without VertexGroup (UnionOptimizer), there is an extra union vertex
+ // which unions input from the two predecessor vertices
+ TezOperator unionTezOp = getTezOp();
+ tezPlan.add(unionTezOp);
+ unionTezOp.markUnion();
+ unionTezOp.setRequestedParallelism(op.getRequestedParallelism());
+ POShuffledValueInputTez unionInput = new POShuffledValueInputTez(OperatorKey.genOpKey(scope));
+ unionTezOp.plan.addAsLeaf(unionInput);
+
+ POValueOutputTez[] outputs = new POValueOutputTez[compiledInputs.length];
for (int i = 0; i < compiledInputs.length; i++) {
TezOperator prevTezOp = compiledInputs[i];
- TezCompilerUtil.connect(tezPlan, prevTezOp, newTezOp);
- // TODO: Use POValueOutputTez instead of POLocalRearrange and
- // unsorted shuffle with TEZ-661 and PIG-3775.
- outputs[i] = localRearrangeFactory.create();
- outputs[i].setUnion(true);
+ // Some predecessors of union need not be part of the union (For eg: replicated join).
+ // So mark predecessors that are input to the union operation.
+ unionTezOp.addUnionPredecessor(prevTezOp.getOperatorKey());
+ TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, prevTezOp, unionTezOp);
+ TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.SCATTER_GATHER);
+ outputs[i] = new POValueOutputTez(OperatorKey.genOpKey(scope));
+ outputs[i].addOutputKey(unionTezOp.getOperatorKey().toString());
+ unionInput.addInputKey(prevTezOp.getOperatorKey().toString());
prevTezOp.plan.addAsLeaf(outputs[i]);
prevTezOp.setClosed(true);
}
- OperatorKey unionKey = newTezOp.getOperatorKey();
- newTezOp.markUnion();
- curTezOp = newTezOp;
-
- // Start a new TezOp so that the successor in physical plan can be
- // added to it.
- newTezOp = getTezOp();
- tezPlan.add(newTezOp);
- tezPlan.connect(curTezOp, newTezOp);
-
- // Connect the POValueOutputTezs in the predecessor vertices to the
- // succeeding vertex.
- for (int i = 0; i < outputs.length; i++) {
- outputs[i].setOutputKey(newTezOp.getOperatorKey().toString());
- }
- // The first operator in the succeeding vertex must be
- // POVertexGroupInputTez.
- POVertexGroupInputTez grpInput = new POVertexGroupInputTez(newTezOp.getOperatorKey());
- grpInput.setInputKey(unionKey.toString());
- grpInput.setAlias(op.getAlias());
- newTezOp.plan.add(grpInput);
- curTezOp = newTezOp;
- curTezOp.setRequestedParallelism(op.getRequestedParallelism());
+ curTezOp = unionTezOp;
phyToTezOpMap.put(op, curTezOp);
} catch (Exception e) {
int errCode = 2034;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue Apr 1 19:49:09 2014
@@ -76,7 +76,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.TezPOPackageAnnotator.LoRearrangeDiscoverer;
import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
-import org.apache.pig.data.BinSedesTuple;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
@@ -94,8 +93,6 @@ import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
@@ -111,9 +108,8 @@ import org.apache.tez.mapreduce.hadoop.M
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
/**
* A visitor to construct DAG out of Tez plan.
@@ -148,7 +144,7 @@ public class TezDagBuilder extends TezOp
// Construct vertex for the current Tez operator
Vertex to = null;
try {
- if (!tezOp.isAliasVertex()) {
+ if (!tezOp.isVertexGroup()) {
boolean isMap = (predecessors == null || predecessors.isEmpty()) ? true : false;
to = newVertex(tezOp, isMap);
dag.addVertex(to);
@@ -170,16 +166,16 @@ public class TezDagBuilder extends TezOp
// must have already been created.
TezOperator pred = predecessors.get(i);
try {
- if (pred.isAliasVertex()) {
- VertexGroup from = pred.getVertexGroup();
- GroupInputEdge edge = newGroupInputEdge(from, to);
+ if (pred.isVertexGroup()) {
+ VertexGroup from = pred.getVertexGroupInfo().getVertexGroup();
+ GroupInputEdge edge = newGroupInputEdge(pred, tezOp, from, to);
dag.addEdge(edge);
} else {
Vertex from = dag.getVertex(pred.getOperatorKey().toString());
- EdgeProperty prop = newEdge(pred, tezOp);
- if (tezOp.isAliasVertex()) {
+ if (tezOp.isVertexGroup()) {
groupMembers[i] = from;
} else {
+ EdgeProperty prop = newEdge(pred, tezOp);
Edge edge = new Edge(from, to, prop);
dag.addEdge(edge);
}
@@ -190,28 +186,30 @@ public class TezDagBuilder extends TezOp
}
}
- if (tezOp.isAliasVertex()) {
+ if (tezOp.isVertexGroup()) {
String groupName = tezOp.getOperatorKey().toString();
- tezOp.setVertexGroup(dag.createVertexGroup(groupName, groupMembers));
+ VertexGroup vertexGroup = dag.createVertexGroup(groupName, groupMembers);
+ tezOp.getVertexGroupInfo().setVertexGroup(vertexGroup);
+ POStore store = tezOp.getVertexGroupInfo().getStore();
+ if (store != null) {
+ vertexGroup.addOutput(store.getOperatorKey().toString(),
+ tezOp.getVertexGroupInfo().getStoreOutputDescriptor(),
+ MROutputCommitter.class);
+ }
}
}
}
- private GroupInputEdge newGroupInputEdge(VertexGroup from, Vertex to)
- throws IOException {
- Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
- setIntermediateInputKeyValue(DataType.TUPLE, conf, null);
- setIntermediateOutputKeyValue(DataType.TUPLE, conf, null);
- MRToTezHelper.convertMRToTezRuntimeConf(conf, globalConf);
+ private GroupInputEdge newGroupInputEdge(TezOperator fromOp,
+ TezOperator toOp, VertexGroup from, Vertex to) throws IOException {
- return new GroupInputEdge(from, to, new EdgeProperty(
- DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- new OutputDescriptor(OnFileSortedOutput.class.getName())
- .setUserPayload(TezUtils.createUserPayloadFromConf(conf)),
- new InputDescriptor(ShuffledMergedInput.class.getName())
- .setUserPayload(TezUtils.createUserPayloadFromConf(conf))),
- new InputDescriptor(ConcatenatedMergedKeyValuesInput.class.getName()));
+ EdgeProperty edgeProperty = newEdge(fromOp, toOp);
+
+ String groupInputClass = edgeProperty.getDataMovementType().equals(
+ DataMovementType.SCATTER_GATHER) ? ConcatenatedMergedKeyValuesInput.class
+ .getName() : ConcatenatedMergedKeyValueInput.class.getName();
+ return new GroupInputEdge(from, to, edgeProperty,
+ new InputDescriptor(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()));
}
/**
@@ -249,25 +247,6 @@ public class TezDagBuilder extends TezOp
}
}
- //TODO: Remove this and set the classes on edge in TezCompiler
- List<POValueOutputTez> valueOutputs = PlanHelper.getPhysicalOperators(from.plan,
- POValueOutputTez.class);
- if (!valueOutputs.isEmpty()) {
- POValueOutputTez valueOutput = valueOutputs.get(0);
- for (String outputKey : valueOutput.outputKeys) {
- if (outputKey.equals(to.getOperatorKey().toString())) {
- conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
- POValueOutputTez.EmptyWritable.class.getName());
- conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS,
- BinSedesTuple.class.getName());
- conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
- POValueOutputTez.EmptyWritable.class.getName());
- conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
- BinSedesTuple.class.getName());
- }
- }
- }
-
conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS,
MRPartitioner.class.getName());
@@ -285,6 +264,13 @@ public class TezDagBuilder extends TezOp
edge.getIntermediateOutputValueClass());
}
+ if (edge.getIntermediateOutputKeyComparatorClass() != null) {
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS,
+ edge.getIntermediateOutputKeyComparatorClass());
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
+ edge.getIntermediateOutputKeyComparatorClass());
+ }
+
conf.setBoolean("mapred.mapper.new-api", true);
conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
@@ -442,11 +428,15 @@ public class TezDagBuilder extends TezOp
if (tezOp.sampleOperator != null && tezOp.sampleOperator == pred) {
// skip sample vertex input
} else {
+ String inputKey = pred.getOperatorKey().toString();
+ if (pred.isVertexGroup()) {
+ pred = mPlan.getOperator(pred.getVertexGroupPredecessors().get(0));
+ }
LinkedList<POLocalRearrangeTez> lrs =
PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
for (POLocalRearrangeTez lr : lrs) {
if (lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
- localRearrangeMap.put((int)lr.getIndex(), pred.getOperatorKey().toString());
+ localRearrangeMap.put((int)lr.getIndex(), inputKey);
}
}
}
@@ -554,10 +544,16 @@ public class TezDagBuilder extends TezOp
outputPayLoad.set(JobControlCompiler.PIG_REDUCE_STORES,
ObjectSerializer.serialize(singleStore));
+ OutputDescriptor storeOutDescriptor = new OutputDescriptor(
+ MROutput.class.getName()).setUserPayload(TezUtils
+ .createUserPayloadFromConf(outputPayLoad));
+ if (tezOp.getVertexGroupStores() != null) {
+ if (tezOp.getVertexGroupStores().containsKey(store.getOperatorKey())) {
+ continue;
+ }
+ }
vertex.addOutput(store.getOperatorKey().toString(),
- new OutputDescriptor(MROutput.class.getName())
- .setUserPayload(TezUtils.createUserPayloadFromConf(outputPayLoad)),
- MROutputCommitter.class);
+ storeOutDescriptor, MROutputCommitter.class);
}
if (stores.size() > 0) {
@@ -651,6 +647,7 @@ public class TezDagBuilder extends TezOp
IOException {
LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
tezOp.plan, POStore.class);
+
if (stores.size() > 0) {
ArrayList<POStore> storeLocations = new ArrayList<POStore>();
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java Tue Apr 1 19:49:09 2014
@@ -49,6 +49,7 @@ public class TezEdgeDescriptor {
private String intermediateOutputKeyClass;
private String intermediateOutputValueClass;
+ private String intermediateOutputKeyComparatorClass;
public TezEdgeDescriptor() {
combinePlan = new PhysicalPlan();
@@ -98,4 +99,13 @@ public class TezEdgeDescriptor {
this.intermediateOutputValueClass = intermediateOutputValueClass;
}
+ public String getIntermediateOutputKeyComparatorClass() {
+ return intermediateOutputKeyComparatorClass;
+ }
+
+ public void setIntermediateOutputKeyComparatorClass(
+ String intermediateOutputKeyComparatorClass) {
+ this.intermediateOutputKeyComparatorClass = intermediateOutputKeyComparatorClass;
+ }
+
}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezInput.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezInput.java?rev=1583768&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezInput.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezInput.java Tue Apr 1 19:49:09 2014
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.tez.runtime.api.LogicalInput;
+
+/**
+ * This interface is implemented by PhysicalOperators that can have Tez inputs
+ * attached directly to the operator.
+ */
+
+public interface TezInput {
+
+ public String[] getTezInputs();
+
+ public void replaceInput(String oldInputKey, String newInputKey);
+
+ /**
+ * Add to the list of inputs to skip download if already available in vertex cache
+ *
+ * @param inputsToSkip
+ */
+ public void addInputsToSkip(Set<String> inputsToSkip);
+
+ public void attachInputs(Map<String, LogicalInput> inputs,
+ Configuration conf) throws ExecException;
+
+}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Tue Apr 1 19:49:09 2014
@@ -32,6 +32,7 @@ import org.apache.pig.backend.BackendExc
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.Launcher;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.UnionOptimizer;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
@@ -149,6 +150,7 @@ public class TezLauncher extends Launche
throws PlanException, IOException, VisitorException {
TezCompiler comp = new TezCompiler(php, pc, tezResourceManager);
TezOperPlan tezPlan = comp.compile();
+
boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty(
PigConfiguration.PROP_NO_COMBINER, "false"));
@@ -188,6 +190,14 @@ public class TezLauncher extends Launche
accum.visit();
}
+ boolean isUnionOpt = "true".equalsIgnoreCase(pc.getProperties()
+ .getProperty(PigConfiguration.TEZ_OPT_UNION, "false"));
+ // Use VertexGroup in Tez
+ if (isUnionOpt) {
+ UnionOptimizer uo = new UnionOptimizer(tezPlan);
+ uo.visit();
+ }
+
return comp.getPlanContainer();
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Tue Apr 1 19:49:09 2014
@@ -29,6 +29,7 @@ import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.VisitorException;
@@ -127,11 +128,23 @@ public class TezOperPlan extends Operato
@Override
public void remove(TezOperator op) {
- //TODO Cleanup outEdges of predecessors and inEdges of successors
- //TezDAGBuilder would not create the edge. So low priority
+ // The remove method does not replace output and input keys in TezInput
+ // and TezOutput. That has to be handled separately.
+ for (OperatorKey opKey : op.outEdges.keySet()) {
+ getOperator(opKey).inEdges.remove(op.getOperatorKey());
+ }
+ for (OperatorKey opKey : op.inEdges.keySet()) {
+ getOperator(opKey).outEdges.remove(op.getOperatorKey());
+ }
super.remove(op);
}
+ @Override
+ public boolean disconnect(TezOperator from, TezOperator to) {
+ from.outEdges.remove(to.getOperatorKey());
+ to.outEdges.remove(from.getOperatorKey());
+ return super.disconnect(from, to);
+ }
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Tue Apr 1 19:49:09 2014
@@ -18,15 +18,20 @@
package org.apache.pig.backend.hadoop.executionengine.tez;
import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.VertexGroup;
import com.google.common.collect.Maps;
@@ -126,8 +131,11 @@ public class TezOperator extends Operato
OPER_FEATURE feature = OPER_FEATURE.NONE;
+ private List<OperatorKey> vertexGroupPredecessors;
// For union
- private VertexGroup group = null;
+ private VertexGroupInfo vertexGroupInfo;
+ // Mapping of OperatorKey of POStore OperatorKey to vertexGroup TezOperator
+ private Map<OperatorKey, OperatorKey> vertexGroupStores = null;
public TezOperator(OperatorKey k) {
super(k);
@@ -245,18 +253,48 @@ public class TezOperator extends Operato
this.useSecondaryKey = useSecondaryKey;
}
- public void setVertexGroup(VertexGroup group) {
- this.group = group;
+ public List<OperatorKey> getUnionPredecessors() {
+ return vertexGroupPredecessors;
}
- public VertexGroup getVertexGroup() {
- return this.group;
+ public List<OperatorKey> getVertexGroupPredecessors() {
+ return vertexGroupPredecessors;
+ }
+
+ public void addUnionPredecessor(OperatorKey unionPredecessor) {
+ if (vertexGroupPredecessors == null) {
+ vertexGroupPredecessors = new ArrayList<OperatorKey>();
+ }
+ this.vertexGroupPredecessors.add(unionPredecessor);
+ }
+
+ public void setVertexGroupPredecessors(List<OperatorKey> vertexGroupPredecessors) {
+ this.vertexGroupPredecessors = vertexGroupPredecessors;
}
// Union is the only operator that uses alias vertex (VertexGroup) now. But
// more operators could be added to the list in the future.
- public boolean isAliasVertex() {
- return isUnion();
+ public boolean isVertexGroup() {
+ return vertexGroupInfo != null;
+ }
+
+ public VertexGroupInfo getVertexGroupInfo() {
+ return vertexGroupInfo;
+ }
+
+ public void setVertexGroupInfo(VertexGroupInfo vertexGroup) {
+ this.vertexGroupInfo = vertexGroup;
+ }
+
+ public void addVertexGroupStore(OperatorKey storeKey, OperatorKey vertexGroupKey) {
+ if (this.vertexGroupStores == null) {
+ this.vertexGroupStores = new HashMap<OperatorKey, OperatorKey>();
+ }
+ this.vertexGroupStores.put(storeKey, vertexGroupKey);
+ }
+
+ public Map<OperatorKey, OperatorKey> getVertexGroupStores() {
+ return this.vertexGroupStores;
}
@Override
@@ -353,5 +391,60 @@ public class TezOperator extends Operato
return combineSmallSplits;
}
+ public static class VertexGroupInfo {
+
+ private List<OperatorKey> inputKeys;
+ private String outputKey;
+ private POStore store;
+ private OutputDescriptor storeOutDescriptor;
+ private VertexGroup vertexGroup;
+
+ public VertexGroupInfo() {
+ }
+
+ public VertexGroupInfo(POStore store) {
+ this.store = store;
+ }
+
+ public List<OperatorKey> getInputs() {
+ return inputKeys;
+ }
+
+ public void addInput(OperatorKey input) {
+ if (inputKeys == null) {
+ inputKeys = new ArrayList<OperatorKey>();
+ }
+ this.inputKeys.add(input);
+ }
+
+ public String getOutput() {
+ return outputKey;
+ }
+
+ public void setOutput(String output) {
+ this.outputKey = output;
+ }
+
+ public POStore getStore() {
+ return store;
+ }
+
+ public OutputDescriptor getStoreOutputDescriptor() {
+ return storeOutDescriptor;
+ }
+
+ public void setStoreOutputDescriptor(OutputDescriptor storeOutDescriptor) {
+ this.storeOutDescriptor = storeOutDescriptor;
+ }
+
+ public VertexGroup getVertexGroup() {
+ return vertexGroup;
+ }
+
+ public void setVertexGroup(VertexGroup vertexGroup) {
+ this.vertexGroup = vertexGroup;
+ }
+
+ }
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOutput.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOutput.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOutput.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOutput.java Tue Apr 1 19:49:09 2014
@@ -30,6 +30,12 @@ import org.apache.tez.runtime.api.Logica
*/
public interface TezOutput {
+
+ public String[] getTezOutputs();
+
+ public void replaceOutput(String oldOutputKey, String newOutputKey);
+
public void attachOutputs(Map<String, LogicalOutput> outputs,
Configuration conf) throws ExecException;
+
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java Tue Apr 1 19:49:09 2014
@@ -66,6 +66,10 @@ public class TezPOPackageAnnotator exten
List<TezOperator> preds = this.mPlan.getPredecessors(pkgTezOp);
for (Iterator<TezOperator> it = preds.iterator(); it.hasNext();) {
TezOperator predTezOp = it.next();
+ if (predTezOp.isVertexGroup()) {
+ // Just get one of the inputs to vertex group
+ predTezOp = getPlan().getOperator(predTezOp.getVertexGroupPredecessors().get(0));
+ }
lrFound += patchPackage(predTezOp, pkgTezOp, pkg);
if(lrFound == pkg.getNumInps()) {
break;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java Tue Apr 1 19:49:09 2014
@@ -24,6 +24,7 @@ import java.util.Map.Entry;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator.VertexGroupInfo;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
@@ -51,8 +52,11 @@ public class TezPrinter extends TezOpPla
@Override
public void visitTezOp(TezOperator tezOper) throws VisitorException {
- if (tezOper.isAliasVertex()) {
- mStream.println("Tez vertex group " + tezOper.getOperatorKey().toString());
+ if (tezOper.isVertexGroup()) {
+ VertexGroupInfo info = tezOper.getVertexGroupInfo();
+ mStream.println("Tez vertex group "
+ + tezOper.getOperatorKey().toString() + "\t<-\t "
+ + info.getInputs() + "\t->\t " + info.getOutput());
mStream.println("# No plan on vertex group");
} else {
mStream.println("Tez vertex " + tezOper.getOperatorKey().toString());
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java Tue Apr 1 19:49:09 2014
@@ -33,7 +33,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.POValueOutputTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezLoad;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezInput;
import org.apache.pig.backend.hadoop.executionengine.tez.TezOutput;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -47,7 +47,7 @@ import org.apache.tez.runtime.library.ap
/**
* POCounterStatsTez is used to group counters from previous vertex POCounterTez tasks
*/
-public class POCounterStatsTez extends PhysicalOperator implements TezLoad, TezOutput {
+public class POCounterStatsTez extends PhysicalOperator implements TezInput, TezOutput {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(POCounterStatsTez.class);
@@ -63,6 +63,18 @@ public class POCounterStatsTez extends P
}
@Override
+ public String[] getTezInputs() {
+ return new String[] { inputKey };
+ }
+
+ @Override
+ public void replaceInput(String oldInputKey, String newInputKey) {
+ if (oldInputKey.equals(inputKey)) {
+ inputKey = newInputKey;
+ }
+ }
+
+ @Override
public void addInputsToSkip(Set<String> inputsToSkip) {
}
@@ -83,6 +95,18 @@ public class POCounterStatsTez extends P
}
@Override
+ public String[] getTezOutputs() {
+ return new String[] { outputKey };
+ }
+
+ @Override
+ public void replaceOutput(String oldOutputKey, String newOutputKey) {
+ if (oldOutputKey.equals(outputKey)) {
+ outputKey = newOutputKey;
+ }
+ }
+
+ @Override
public void attachOutputs(Map<String, LogicalOutput> outputs,
Configuration conf) throws ExecException {
LogicalOutput output = outputs.get(outputKey);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java Tue Apr 1 19:49:09 2014
@@ -54,6 +54,12 @@ public class POCounterTez extends POCoun
super(copy);
}
+ @Override
+ public void initialize(TezProcessorContext processorContext)
+ throws ExecException {
+ this.setTaskId(processorContext.getTaskIndex());
+ }
+
public void setTuplesOutputKey(String tuplesOutputKey) {
this.tuplesOutputKey = tuplesOutputKey;
}
@@ -63,9 +69,17 @@ public class POCounterTez extends POCoun
}
@Override
- public void initialize(TezProcessorContext processorContext)
- throws ExecException {
- this.setTaskId(processorContext.getTaskIndex());
+ public String[] getTezOutputs() {
+ return new String[] { tuplesOutputKey, statsOutputKey };
+ }
+
+ @Override
+ public void replaceOutput(String oldOutputKey, String newOutputKey) {
+ if (oldOutputKey.equals(tuplesOutputKey)) {
+ tuplesOutputKey = newOutputKey;
+ } else if (oldOutputKey.equals(statsOutputKey)) {
+ statsOutputKey = newOutputKey;
+ }
}
@Override
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java Tue Apr 1 19:49:09 2014
@@ -32,13 +32,13 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.tez.ObjectCache;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezLoad;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezInput;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.library.api.KeyValueReader;
-public class PORankTez extends PORank implements TezLoad {
+public class PORankTez extends PORank implements TezInput {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(PORankTez.class);
@@ -62,6 +62,20 @@ public class PORankTez extends PORank im
}
@Override
+ public String[] getTezInputs() {
+ return new String[] { tuplesInputKey, statsInputKey };
+ }
+
+ @Override
+ public void replaceInput(String oldInputKey, String newInputKey) {
+ if (oldInputKey.equals(tuplesInputKey)) {
+ tuplesInputKey = newInputKey;
+ } else if (oldInputKey.equals(statsInputKey)) {
+ statsInputKey = newInputKey;
+ }
+ }
+
+ @Override
public void addInputsToSkip(Set<String> inputsToSkip) {
String cacheKey = "rankstats-" + getOperatorKey().toString();
Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);