You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/03/25 23:21:28 UTC
svn commit: r1581564 - in /pig/branches/tez:
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/tools/pigstats/tez/
test/org/apache/pig/test/data/GoldenFiles/ test/org/apache/pig/tez/
Author: cheolsoo
Date: Tue Mar 25 22:21:27 2014
New Revision: 1581564
URL: http://svn.apache.org/r1581564
Log:
PIG-3743: Use VertexGroup and Alias vertex for union (cheolsoo)
Added:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld
Removed:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POUnionTezLoad.java
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java
pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java?rev=1581564&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java Tue Mar 25 22:21:27 2014
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+
+/**
+ * POVertexGroupInputTez is used on the backend to union tuples from Tez
+ * ConcatenatedMergedInputs
+ */
+public class POVertexGroupInputTez extends PhysicalOperator implements TezLoad {
+
+ private static final long serialVersionUID = 1L;
+
+ private boolean hasNext;
+ private String inputKey;
+ private transient KeyValuesReader reader;
+
+ public POVertexGroupInputTez(OperatorKey k) {
+ super(k);
+ }
+
+ public String getInputKey() {
+ return inputKey;
+ }
+
+ public void setInputKey(String inputKey) {
+ this.inputKey = inputKey;
+ }
+
+ @Override
+ public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf)
+ throws ExecException {
+ try {
+ LogicalInput input = inputs.get(inputKey);
+ if (input == null) {
+ throw new ExecException("Input GroupVertex " + inputKey + " is missing");
+ }
+ reader = (KeyValuesReader) input.getReader();
+ hasNext = reader.next();
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+ try {
+ while (hasNext) {
+ if (reader.getCurrentValues().iterator().hasNext()) {
+ NullableTuple val = (NullableTuple) reader.getCurrentValues().iterator().next();
+ return new Result(POStatus.STATUS_OK, val.getValueAsPigType());
+ }
+ hasNext = reader.next();
+ }
+ return RESULT_EOP;
+ } catch (IOException e) {
+ throw new ExecException(e);
+ }
+ }
+
+ @Override
+ public void addInputsToSkip(Set<String> inputsToSkip) {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visit(this);
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public String name() {
+ return "POVertexGroupInputTez - " + inputKey+ "\t->\t " + mKey.toString();
+ }
+}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1581564&r1=1581563&r2=1581564&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Tue Mar 25 22:21:27 2014
@@ -1968,27 +1968,45 @@ public class TezCompiler extends PhyPlan
@Override
public void visitUnion(POUnion op) throws VisitorException {
try {
- // Need to add POLocalRearrange to the end of each previous tezOp
- // before we broadcast.
+ // Add alias vertex. This will be converted to VertexGroup by
+ // TezDagBuilder.
+ TezOperator newTezOp = getTezOp();
+ tezPlan.add(newTezOp);
+ POLocalRearrangeTez[] outputs = new POLocalRearrangeTez[compiledInputs.length];
for (int i = 0; i < compiledInputs.length; i++) {
- POLocalRearrangeTez lr = localRearrangeFactory.create(i, LocalRearrangeType.STAR);
- lr.setAlias(op.getAlias());
- lr.setUnion(true);
- compiledInputs[i].plan.addAsLeaf(lr);
+ TezOperator prevTezOp = compiledInputs[i];
+ TezCompilerUtil.connect(tezPlan, prevTezOp, newTezOp);
+ // TODO: Use POValueOutputTez instead of POLocalRearrange and
+ // unsorted shuffle with TEZ-661 and PIG-3775.
+ outputs[i] = localRearrangeFactory.create(LocalRearrangeType.NULL);
+ prevTezOp.plan.addAsLeaf(outputs[i]);
+ prevTezOp.setClosed(true);
}
+ OperatorKey unionKey = newTezOp.getOperatorKey();
+ newTezOp.markUnion();
+ curTezOp = newTezOp;
+
+ // Start a new TezOp so that the successor in physical plan can be
+ // added to it.
+ newTezOp = getTezOp();
+ tezPlan.add(newTezOp);
+ tezPlan.connect(curTezOp, newTezOp);
+
+ // Connect the POValueOutputTezs in the predecessor vertices to the
+ // succeeding vertex.
+ for (int i = 0; i < outputs.length; i++) {
+ outputs[i].setOutputKey(newTezOp.getOperatorKey().toString());
+ }
+ // The first operator in the succeeding vertex must be
+ // POVertexGroupInputTez.
+ POVertexGroupInputTez grpInput = new POVertexGroupInputTez(newTezOp.getOperatorKey());
+ grpInput.setInputKey(unionKey.toString());
+ grpInput.setAlias(op.getAlias());
+ newTezOp.plan.add(grpInput);
+ curTezOp = newTezOp;
- // Mark the start of a new TezOperator, connecting the inputs. Note
- // the parallelism is currently fixed to 1 for all TezOperators.
- blocking();
-
- // Then add a POPackage to the start of the new tezOp.
- POPackage pkg = getPackage(compiledInputs.length, DataType.TUPLE);
- pkg.setAlias(op.getAlias());
- curTezOp.markUnion();
- curTezOp.plan.add(pkg);
curTezOp.setRequestedParallelism(op.getRequestedParallelism());
phyToTezOpMap.put(op, curTezOp);
- // TODO: Use alias vertex that is introduced by TEZ-678
} catch (Exception e) {
int errCode = 2034;
String msg = "Error compiling operator " + op.getClass().getSimpleName();
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1581564&r1=1581563&r2=1581564&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue Mar 25 22:21:27 2014
@@ -93,10 +93,15 @@ import org.apache.tez.common.TezJobConfi
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.mapreduce.combine.MRCombiner;
import org.apache.tez.mapreduce.committer.MROutputCommitter;
import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
@@ -106,6 +111,9 @@ import org.apache.tez.mapreduce.hadoop.M
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
/**
* A visitor to construct DAG out of Tez plan.
@@ -140,9 +148,14 @@ public class TezDagBuilder extends TezOp
// Construct vertex for the current Tez operator
Vertex to = null;
try {
- boolean isMap = (predecessors == null || predecessors.isEmpty()) ? true : false;
- to = newVertex(tezOp, isMap);
- dag.addVertex(to);
+ if (!tezOp.isAliasVertex()) {
+ boolean isMap = (predecessors == null || predecessors.isEmpty()) ? true : false;
+ to = newVertex(tezOp, isMap);
+ dag.addVertex(to);
+ } else {
+ // For union, we construct VertexGroup after iterating the
+ // predecessors.
+ }
} catch (Exception e) {
throw new VisitorException("Cannot create vertex for "
+ tezOp.name(), e);
@@ -150,23 +163,57 @@ public class TezDagBuilder extends TezOp
// Connect the new vertex with predecessor vertices
if (predecessors != null) {
- for (TezOperator predecessor : predecessors) {
+ Vertex[] groupMembers = new Vertex[predecessors.size()];
+
+ for (int i = 0; i < predecessors.size(); i++) {
// Since this is a dependency order walker, predecessor vertices
// must have already been created.
- Vertex from = dag.getVertex(predecessor.getOperatorKey().toString());
- EdgeProperty prop = null;
+ TezOperator pred = predecessors.get(i);
try {
- prop = newEdge(predecessor, tezOp);
+ if (pred.isAliasVertex()) {
+ VertexGroup from = pred.getVertexGroup();
+ GroupInputEdge edge = newGroupInputEdge(from, to);
+ dag.addEdge(edge);
+ } else {
+ Vertex from = dag.getVertex(pred.getOperatorKey().toString());
+ EdgeProperty prop = newEdge(pred, tezOp);
+ if (tezOp.isAliasVertex()) {
+ groupMembers[i] = from;
+ } else {
+ Edge edge = new Edge(from, to, prop);
+ dag.addEdge(edge);
+ }
+ }
} catch (IOException e) {
throw new VisitorException("Cannot create edge from "
- + predecessor.name() + " to " + tezOp.name(), e);
+ + pred.name() + " to " + tezOp.name(), e);
}
- Edge edge = new Edge(from, to, prop);
- dag.addEdge(edge);
+ }
+
+ if (tezOp.isAliasVertex()) {
+ String groupName = tezOp.getOperatorKey().toString();
+ tezOp.setVertexGroup(dag.createVertexGroup(groupName, groupMembers));
}
}
}
+ private GroupInputEdge newGroupInputEdge(VertexGroup from, Vertex to)
+ throws IOException {
+ Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
+ setIntermediateInputKeyValue(DataType.BYTEARRAY, conf, null);
+ setIntermediateOutputKeyValue(DataType.BYTEARRAY, conf, null);
+ MRToTezHelper.convertMRToTezRuntimeConf(conf, globalConf);
+
+ return new GroupInputEdge(from, to, new EdgeProperty(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(OnFileSortedOutput.class.getName())
+ .setUserPayload(TezUtils.createUserPayloadFromConf(conf)),
+ new InputDescriptor(ShuffledMergedInput.class.getName())
+ .setUserPayload(TezUtils.createUserPayloadFromConf(conf))),
+ new InputDescriptor(ConcatenatedMergedKeyValuesInput.class.getName()));
+ }
+
/**
* Return EdgeProperty that connects two vertices.
*
@@ -363,11 +410,7 @@ public class TezDagBuilder extends TezOp
payloadConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
setIntermediateInputKeyValue(keyType, payloadConf, tezOp);
POShuffleTezLoad newPack;
- if (tezOp.isUnion()) {
- newPack = new POUnionTezLoad(pack);
- } else {
- newPack = new POShuffleTezLoad(pack);
- }
+ newPack = new POShuffleTezLoad(pack);
if (tezOp.isSkewedJoin()) {
newPack.setSkewedJoins(true);
}
@@ -381,7 +424,8 @@ public class TezDagBuilder extends TezOp
if (tezOp.sampleOperator != null && tezOp.sampleOperator == pred) {
// skip sample vertex input
} else {
- LinkedList<POLocalRearrangeTez> lrs = PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
+ LinkedList<POLocalRearrangeTez> lrs =
+ PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
for (POLocalRearrangeTez lr : lrs) {
if (lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
localRearrangeMap.put((int)lr.getIndex(), pred.getOperatorKey().toString());
@@ -399,8 +443,7 @@ public class TezDagBuilder extends TezOp
}
}
- setIntermediateInputKeyValue(pack.getPkgr().getKeyType(), payloadConf,
- tezOp);
+ setIntermediateInputKeyValue(pack.getPkgr().getKeyType(), payloadConf, tezOp);
} else if (roots.size() == 1 && roots.get(0) instanceof POIdentityInOutTez) {
POIdentityInOutTez identityInOut = (POIdentityInOutTez) roots.get(0);
// TODO Need to fix multiple input key mapping
@@ -660,10 +703,10 @@ public class TezDagBuilder extends TezOp
@SuppressWarnings("rawtypes")
private void setIntermediateInputKeyValue(byte keyType, Configuration conf, TezOperator tezOp)
throws JobCreationException, ExecException {
- if (tezOp.isUseSecondaryKey()) {
+ if (tezOp != null && tezOp.isUseSecondaryKey()) {
conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
NullableTuple.class.getName());
- } else if (tezOp.isSkewedJoin()) {
+ } else if (tezOp != null && tezOp.isSkewedJoin()) {
conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
NullablePartitionWritable.class.getName());
} else {
@@ -683,7 +726,7 @@ public class TezDagBuilder extends TezOp
throws JobCreationException, ExecException {
Class<? extends WritableComparable> keyClass = HDataType
.getWritableComparableTypes(keyType).getClass();
- if (tezOp.isSkewedJoin()) {
+ if (tezOp != null && tezOp.isSkewedJoin()) {
conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
NullablePartitionWritable.class.getName());
} else {
@@ -756,7 +799,7 @@ public class TezDagBuilder extends TezOp
throws JobCreationException {
// TODO: Handle sorting like in JobControlCompiler
// TODO: Group comparators as in JobControlCompiler
- if (tezOp.isUseSecondaryKey()) {
+ if (tezOp != null && tezOp.isUseSecondaryKey()) {
conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
PigSecondaryKeyComparator.class.getName());
conf.set(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS,
@@ -764,7 +807,7 @@ public class TezDagBuilder extends TezOp
conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS,
PigSecondaryKeyGroupComparator.class.getName());
} else {
- if (tezOp.isSkewedJoin()) {
+ if (tezOp != null && tezOp.isSkewedJoin()) {
// TODO: PigGroupingPartitionWritableComparator only used as Group comparator in MR.
// What should be TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS if same as MR?
conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
@@ -784,7 +827,7 @@ public class TezDagBuilder extends TezOp
void selectOutputComparator(byte keyType, Configuration conf, TezOperator tezOp)
throws JobCreationException {
// TODO: Handle sorting like in JobControlCompiler
- if (tezOp.isSkewedJoin()) {
+ if (tezOp != null && tezOp.isSkewedJoin()) {
// TODO: PigGroupingPartitionWritableComparator only used as Group comparator in MR.
// What should be TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS if same as MR?
conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS,
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1581564&r1=1581563&r2=1581564&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Tue Mar 25 22:21:27 2014
@@ -27,6 +27,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.dag.api.VertexGroup;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -125,6 +126,9 @@ public class TezOperator extends Operato
OPER_FEATURE feature = OPER_FEATURE.NONE;
+ // For union
+ private VertexGroup group = null;
+
public TezOperator(OperatorKey k) {
super(k);
plan = new PhysicalPlan();
@@ -241,6 +245,20 @@ public class TezOperator extends Operato
this.useSecondaryKey = useSecondaryKey;
}
+ public void setVertexGroup(VertexGroup group) {
+ this.group = group;
+ }
+
+ public VertexGroup getVertexGroup() {
+ return this.group;
+ }
+
+ // Union is the only operator that uses alias vertex (VertexGroup) now. But
+ // more operators could be added to the list in the future.
+ public boolean isAliasVertex() {
+ return isUnion();
+ }
+
@Override
public String name() {
String udfStr = getUDFsAsStr();
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java?rev=1581564&r1=1581563&r2=1581564&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java Tue Mar 25 22:21:27 2014
@@ -51,7 +51,12 @@ public class TezPrinter extends TezOpPla
@Override
public void visitTezOp(TezOperator tezOper) throws VisitorException {
- mStream.println("Tez vertex " + tezOper.getOperatorKey().toString());
+ if (tezOper.isAliasVertex()) {
+ mStream.println("Tez vertex group " + tezOper.getOperatorKey().toString());
+ mStream.println("# No plan on vertex group");
+ } else {
+ mStream.println("Tez vertex " + tezOper.getOperatorKey().toString());
+ }
if (tezOper.inEdges.size() > 0) {
for (Entry<OperatorKey, TezEdgeDescriptor> inEdge : tezOper.inEdges.entrySet()) {
//TODO: Print other edge properties like custom partitioner
Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java?rev=1581564&r1=1581563&r2=1581564&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java Tue Mar 25 22:21:27 2014
@@ -36,6 +36,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.tools.pigstats.InputStats;
@@ -63,6 +64,7 @@ public class TezStats extends PigStats {
private List<String> dagStatsStrings;
private Map<String, TezTaskStats> tezOpVertexMap;
+ private List<TezTaskStats> taskStatsToBeRemoved;
/**
* This class builds the Tez DAG from a Tez plan.
@@ -86,6 +88,11 @@ public class TezStats extends PigStats {
}
}
}
+ // Remove VertexGroups (union) from JobGraph since they're not
+ // materialized as real vertices by Tez.
+ if (tezOp.isAliasVertex()) {
+ taskStatsToBeRemoved.add(currStats);
+ }
tezOpVertexMap.put(tezOp.getOperatorKey().toString(), currStats);
}
}
@@ -95,13 +102,17 @@ public class TezStats extends PigStats {
this.jobPlan = new JobGraph();
this.tezOpVertexMap = Maps.newHashMap();
this.dagStatsStrings = Lists.newArrayList();
+ this.taskStatsToBeRemoved = Lists.newArrayList();
}
public void initialize(TezOperPlan tezPlan) {
super.start();
try {
new JobGraphBuilder(tezPlan).visit();
- } catch (VisitorException e) {
+ for (TezTaskStats taskStat : taskStatsToBeRemoved) {
+ jobPlan.removeAndReconnect(taskStat);
+ }
+ } catch (FrontendException e) {
LOG.warn("Unable to build Tez DAG", e);
}
}
Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld?rev=1581564&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld Tue Mar 25 22:21:27 2014
@@ -0,0 +1,52 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-26
+#--------------------------------------------------
+Tez vertex scope-18 -> Tez vertex scope-20,
+Tez vertex scope-19 -> Tez vertex scope-20,
+Tez vertex scope-20 -> Tez vertex scope-25,
+Tez vertex scope-25
+
+Tez vertex scope-18
+# Plan on vertex
+Local Rearrange[tuple]{bytearray}(false) - scope-22 -> scope-25
+| |
+| Constant(DummyVal) - scope-21
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[chararray] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-19
+# Plan on vertex
+Local Rearrange[tuple]{bytearray}(false) - scope-24 -> scope-25
+| |
+| Constant(DummyVal) - scope-23
+|
+|---c: New For Each(false,false)[bag] - scope-15
+ | |
+ | Cast[int] - scope-10
+ | |
+ | |---Project[bytearray][1] - scope-9
+ | |
+ | Cast[chararray] - scope-13
+ | |
+ | |---Project[bytearray][0] - scope-12
+ |
+ |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex group scope-20
+# No plan on vertex group
+Tez vertex scope-25
+# Plan on vertex
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-17
+|
+|---POVertexGroupInputTez - scope-20 -> scope-25
Modified: pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java?rev=1581564&r1=1581563&r2=1581564&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java Tue Mar 25 22:21:27 2014
@@ -313,6 +313,17 @@ public class TestTezCompiler {
run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC18.gld");
}
+ @Test
+ public void testUnion() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+ "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+ "c = union onschema a, b;" +
+ "store c into 'file:///tmp/output';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld");
+ }
+
private void run(String query, String expectedFile) throws Exception {
PhysicalPlan pp = Util.buildPp(pigServer, query);
TezLauncher launcher = new TezLauncher();