You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/03/25 23:21:28 UTC

svn commit: r1581564 - in /pig/branches/tez: src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/tools/pigstats/tez/ test/org/apache/pig/test/data/GoldenFiles/ test/org/apache/pig/tez/

Author: cheolsoo
Date: Tue Mar 25 22:21:27 2014
New Revision: 1581564

URL: http://svn.apache.org/r1581564
Log:
PIG-3743: Use VertexGroup and Alias vertex for union (cheolsoo)

Added:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld
Removed:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POUnionTezLoad.java
Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java
    pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java?rev=1581564&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java Tue Mar 25 22:21:27 2014
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+
+/**
+ * POVertexGroupInputTez is used on the backend to union tuples from Tez
+ * ConcatenatedMergedInputs
+ */
+public class POVertexGroupInputTez extends PhysicalOperator implements TezLoad {
+
+    private static final long serialVersionUID = 1L;
+
+    private boolean hasNext;
+    private String inputKey;
+    private transient KeyValuesReader reader;
+
+    public POVertexGroupInputTez(OperatorKey k) {
+        super(k);
+    }
+
+    public String getInputKey() {
+        return inputKey;
+    }
+
+    public void setInputKey(String inputKey) {
+        this.inputKey = inputKey;
+    }
+
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf)
+            throws ExecException {
+        try {
+            LogicalInput input = inputs.get(inputKey);
+            if (input == null) {
+                throw new ExecException("Input GroupVertex " + inputKey + " is missing");
+            }
+            reader = (KeyValuesReader) input.getReader();
+            hasNext = reader.next();
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+        try {
+            while (hasNext) {
+                if (reader.getCurrentValues().iterator().hasNext()) {
+                    NullableTuple val = (NullableTuple) reader.getCurrentValues().iterator().next();
+                    return new Result(POStatus.STATUS_OK, val.getValueAsPigType());
+                }
+                hasNext = reader.next();
+            }
+            return RESULT_EOP;
+        } catch (IOException e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visit(this);
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public String name() {
+        return "POVertexGroupInputTez - " + inputKey+ "\t->\t " + mKey.toString();
+    }
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1581564&r1=1581563&r2=1581564&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Tue Mar 25 22:21:27 2014
@@ -1968,27 +1968,45 @@ public class TezCompiler extends PhyPlan
     @Override
     public void visitUnion(POUnion op) throws VisitorException {
         try {
-            // Need to add POLocalRearrange to the end of each previous tezOp
-            // before we broadcast.
+            // Add alias vertex. This will be converted to VertexGroup by
+            // TezDagBuilder.
+            TezOperator newTezOp = getTezOp();
+            tezPlan.add(newTezOp);
+            POLocalRearrangeTez[] outputs = new POLocalRearrangeTez[compiledInputs.length];
             for (int i = 0; i < compiledInputs.length; i++) {
-                POLocalRearrangeTez lr = localRearrangeFactory.create(i, LocalRearrangeType.STAR);
-                lr.setAlias(op.getAlias());
-                lr.setUnion(true);
-                compiledInputs[i].plan.addAsLeaf(lr);
+                TezOperator prevTezOp = compiledInputs[i];
+                TezCompilerUtil.connect(tezPlan, prevTezOp, newTezOp);
+                // TODO: Use POValueOutputTez instead of POLocalRearrange and
+                // unsorted shuffle with TEZ-661 and PIG-3775.
+                outputs[i] = localRearrangeFactory.create(LocalRearrangeType.NULL);
+                prevTezOp.plan.addAsLeaf(outputs[i]);
+                prevTezOp.setClosed(true);
             }
+            OperatorKey unionKey = newTezOp.getOperatorKey();
+            newTezOp.markUnion();
+            curTezOp = newTezOp;
+
+            // Start a new TezOp so that the successor in physical plan can be
+            // added to it.
+            newTezOp = getTezOp();
+            tezPlan.add(newTezOp);
+            tezPlan.connect(curTezOp, newTezOp);
+
+            // Connect the POValueOutputTezs in the predecessor vertices to the
+            // succeeding vertex.
+            for (int i = 0; i < outputs.length; i++) {
+                outputs[i].setOutputKey(newTezOp.getOperatorKey().toString());
+            }
+            // The first operator in the succeeding vertex must be
+            // POVertexGroupInputTez.
+            POVertexGroupInputTez grpInput = new POVertexGroupInputTez(newTezOp.getOperatorKey());
+            grpInput.setInputKey(unionKey.toString());
+            grpInput.setAlias(op.getAlias());
+            newTezOp.plan.add(grpInput);
+            curTezOp = newTezOp;
 
-            // Mark the start of a new TezOperator, connecting the inputs. Note
-            // the parallelism is currently fixed to 1 for all TezOperators.
-            blocking();
-
-            // Then add a POPackage to the start of the new tezOp.
-            POPackage pkg = getPackage(compiledInputs.length, DataType.TUPLE);
-            pkg.setAlias(op.getAlias());
-            curTezOp.markUnion();
-            curTezOp.plan.add(pkg);
             curTezOp.setRequestedParallelism(op.getRequestedParallelism());
             phyToTezOpMap.put(op, curTezOp);
-            // TODO: Use alias vertex that is introduced by TEZ-678
         } catch (Exception e) {
             int errCode = 2034;
             String msg = "Error compiling operator " + op.getClass().getSimpleName();

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1581564&r1=1581563&r2=1581564&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue Mar 25 22:21:27 2014
@@ -93,10 +93,15 @@ import org.apache.tez.common.TezJobConfi
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.mapreduce.combine.MRCombiner;
 import org.apache.tez.mapreduce.committer.MROutputCommitter;
 import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
@@ -106,6 +111,9 @@ import org.apache.tez.mapreduce.hadoop.M
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 
 /**
  * A visitor to construct DAG out of Tez plan.
@@ -140,9 +148,14 @@ public class TezDagBuilder extends TezOp
         // Construct vertex for the current Tez operator
         Vertex to = null;
         try {
-            boolean isMap = (predecessors == null || predecessors.isEmpty()) ? true : false;
-            to = newVertex(tezOp, isMap);
-            dag.addVertex(to);
+            if (!tezOp.isAliasVertex()) {
+                boolean isMap = (predecessors == null || predecessors.isEmpty()) ? true : false;
+                to = newVertex(tezOp, isMap);
+                dag.addVertex(to);
+            } else {
+                // For union, we construct VertexGroup after iterating the
+                // predecessors.
+            }
         } catch (Exception e) {
             throw new VisitorException("Cannot create vertex for "
                     + tezOp.name(), e);
@@ -150,23 +163,57 @@ public class TezDagBuilder extends TezOp
 
         // Connect the new vertex with predecessor vertices
         if (predecessors != null) {
-            for (TezOperator predecessor : predecessors) {
+            Vertex[] groupMembers = new Vertex[predecessors.size()];
+
+            for (int i = 0; i < predecessors.size(); i++) {
                 // Since this is a dependency order walker, predecessor vertices
                 // must have already been created.
-                Vertex from = dag.getVertex(predecessor.getOperatorKey().toString());
-                EdgeProperty prop = null;
+                TezOperator pred = predecessors.get(i);
                 try {
-                    prop = newEdge(predecessor, tezOp);
+                    if (pred.isAliasVertex()) {
+                        VertexGroup from = pred.getVertexGroup();
+                        GroupInputEdge edge = newGroupInputEdge(from, to);
+                        dag.addEdge(edge);
+                    } else {
+                        Vertex from = dag.getVertex(pred.getOperatorKey().toString());
+                        EdgeProperty prop = newEdge(pred, tezOp);
+                        if (tezOp.isAliasVertex()) {
+                            groupMembers[i] = from;
+                        } else {
+                            Edge edge = new Edge(from, to, prop);
+                            dag.addEdge(edge);
+                        }
+                    }
                 } catch (IOException e) {
                     throw new VisitorException("Cannot create edge from "
-                            + predecessor.name() + " to " + tezOp.name(), e);
+                            + pred.name() + " to " + tezOp.name(), e);
                 }
-                Edge edge = new Edge(from, to, prop);
-                dag.addEdge(edge);
+            }
+
+            if (tezOp.isAliasVertex()) {
+                String groupName = tezOp.getOperatorKey().toString();
+                tezOp.setVertexGroup(dag.createVertexGroup(groupName, groupMembers));
             }
         }
     }
 
+    private GroupInputEdge newGroupInputEdge(VertexGroup from, Vertex to)
+            throws IOException {
+        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
+        setIntermediateInputKeyValue(DataType.BYTEARRAY, conf, null);
+        setIntermediateOutputKeyValue(DataType.BYTEARRAY, conf, null);
+        MRToTezHelper.convertMRToTezRuntimeConf(conf, globalConf);
+
+        return new GroupInputEdge(from, to, new EdgeProperty(
+                DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+                SchedulingType.SEQUENTIAL,
+                new OutputDescriptor(OnFileSortedOutput.class.getName())
+                    .setUserPayload(TezUtils.createUserPayloadFromConf(conf)),
+                new InputDescriptor(ShuffledMergedInput.class.getName())
+                    .setUserPayload(TezUtils.createUserPayloadFromConf(conf))),
+                    new InputDescriptor(ConcatenatedMergedKeyValuesInput.class.getName()));
+    }
+
     /**
      * Return EdgeProperty that connects two vertices.
      *
@@ -363,11 +410,7 @@ public class TezDagBuilder extends TezOp
             payloadConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
             setIntermediateInputKeyValue(keyType, payloadConf, tezOp);
             POShuffleTezLoad newPack;
-            if (tezOp.isUnion()) {
-                newPack = new POUnionTezLoad(pack);
-            } else {
-                newPack = new POShuffleTezLoad(pack);
-            }
+            newPack = new POShuffleTezLoad(pack);
             if (tezOp.isSkewedJoin()) {
                 newPack.setSkewedJoins(true);
             }
@@ -381,7 +424,8 @@ public class TezDagBuilder extends TezOp
                 if (tezOp.sampleOperator != null && tezOp.sampleOperator == pred) {
                     // skip sample vertex input
                 } else {
-                    LinkedList<POLocalRearrangeTez> lrs = PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
+                    LinkedList<POLocalRearrangeTez> lrs =
+                            PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
                     for (POLocalRearrangeTez lr : lrs) {
                         if (lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
                             localRearrangeMap.put((int)lr.getIndex(), pred.getOperatorKey().toString());
@@ -399,8 +443,7 @@ public class TezDagBuilder extends TezOp
                 }
             }
 
-            setIntermediateInputKeyValue(pack.getPkgr().getKeyType(), payloadConf,
-                    tezOp);
+            setIntermediateInputKeyValue(pack.getPkgr().getKeyType(), payloadConf, tezOp);
         } else if (roots.size() == 1 && roots.get(0) instanceof POIdentityInOutTez) {
             POIdentityInOutTez identityInOut = (POIdentityInOutTez) roots.get(0);
             // TODO Need to fix multiple input key mapping
@@ -660,10 +703,10 @@ public class TezDagBuilder extends TezOp
     @SuppressWarnings("rawtypes")
     private void setIntermediateInputKeyValue(byte keyType, Configuration conf, TezOperator tezOp)
             throws JobCreationException, ExecException {
-        if (tezOp.isUseSecondaryKey()) {
+        if (tezOp != null && tezOp.isUseSecondaryKey()) {
             conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
                     NullableTuple.class.getName());
-        } else if (tezOp.isSkewedJoin()) {
+        } else if (tezOp != null && tezOp.isSkewedJoin()) {
             conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
                     NullablePartitionWritable.class.getName());
         } else {
@@ -683,7 +726,7 @@ public class TezDagBuilder extends TezOp
             throws JobCreationException, ExecException {
         Class<? extends WritableComparable> keyClass = HDataType
                 .getWritableComparableTypes(keyType).getClass();
-        if (tezOp.isSkewedJoin()) {
+        if (tezOp != null && tezOp.isSkewedJoin()) {
             conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
                     NullablePartitionWritable.class.getName());
         } else {
@@ -756,7 +799,7 @@ public class TezDagBuilder extends TezOp
             throws JobCreationException {
         // TODO: Handle sorting like in JobControlCompiler
         // TODO: Group comparators as in JobControlCompiler
-        if (tezOp.isUseSecondaryKey()) {
+        if (tezOp != null && tezOp.isUseSecondaryKey()) {
             conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
                     PigSecondaryKeyComparator.class.getName());
             conf.set(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS,
@@ -764,7 +807,7 @@ public class TezDagBuilder extends TezOp
             conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS,
                     PigSecondaryKeyGroupComparator.class.getName());
         } else {
-            if (tezOp.isSkewedJoin()) {
+            if (tezOp != null && tezOp.isSkewedJoin()) {
                 // TODO: PigGroupingPartitionWritableComparator only used as Group comparator in MR.
                 // What should be TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS if same as MR?
                 conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
@@ -784,7 +827,7 @@ public class TezDagBuilder extends TezOp
     void selectOutputComparator(byte keyType, Configuration conf, TezOperator tezOp)
             throws JobCreationException {
         // TODO: Handle sorting like in JobControlCompiler
-        if (tezOp.isSkewedJoin()) {
+        if (tezOp != null && tezOp.isSkewedJoin()) {
             // TODO: PigGroupingPartitionWritableComparator only used as Group comparator in MR.
             // What should be TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS if same as MR?
             conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS,

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1581564&r1=1581563&r2=1581564&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Tue Mar 25 22:21:27 2014
@@ -27,6 +27,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.dag.api.VertexGroup;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -125,6 +126,9 @@ public class TezOperator extends Operato
 
     OPER_FEATURE feature = OPER_FEATURE.NONE;
 
+    // For union
+    private VertexGroup group = null;
+
     public TezOperator(OperatorKey k) {
         super(k);
         plan = new PhysicalPlan();
@@ -241,6 +245,20 @@ public class TezOperator extends Operato
         this.useSecondaryKey = useSecondaryKey;
     }
 
+    public void setVertexGroup(VertexGroup group) {
+        this.group = group;
+    }
+
+    public VertexGroup getVertexGroup() {
+        return this.group;
+    }
+
+    // Union is the only operator that uses alias vertex (VertexGroup) now. But
+    // more operators could be added to the list in the future.
+    public boolean isAliasVertex() {
+        return isUnion();
+    }
+
     @Override
     public String name() {
         String udfStr = getUDFsAsStr();

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java?rev=1581564&r1=1581563&r2=1581564&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java Tue Mar 25 22:21:27 2014
@@ -51,7 +51,12 @@ public class TezPrinter extends TezOpPla
 
     @Override
     public void visitTezOp(TezOperator tezOper) throws VisitorException {
-        mStream.println("Tez vertex " + tezOper.getOperatorKey().toString());
+        if (tezOper.isAliasVertex()) {
+            mStream.println("Tez vertex group " + tezOper.getOperatorKey().toString());
+            mStream.println("# No plan on vertex group");
+        } else {
+            mStream.println("Tez vertex " + tezOper.getOperatorKey().toString());
+        }
         if (tezOper.inEdges.size() > 0) {
             for (Entry<OperatorKey, TezEdgeDescriptor> inEdge : tezOper.inEdges.entrySet()) {
                 //TODO: Print other edge properties like custom partitioner

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java?rev=1581564&r1=1581563&r2=1581564&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java Tue Mar 25 22:21:27 2014
@@ -36,6 +36,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.tools.pigstats.InputStats;
@@ -63,6 +64,7 @@ public class TezStats extends PigStats {
 
     private List<String> dagStatsStrings;
     private Map<String, TezTaskStats> tezOpVertexMap;
+    private List<TezTaskStats> taskStatsToBeRemoved;
 
     /**
      * This class builds the Tez DAG from a Tez plan.
@@ -86,6 +88,11 @@ public class TezStats extends PigStats {
                     }
                 }
             }
+            // Remove VertexGroups (union) from JobGraph since they're not
+            // materialized as real vertices by Tez.
+            if (tezOp.isAliasVertex()) {
+                taskStatsToBeRemoved.add(currStats);
+            }
             tezOpVertexMap.put(tezOp.getOperatorKey().toString(), currStats);
         }
     }
@@ -95,13 +102,17 @@ public class TezStats extends PigStats {
         this.jobPlan = new JobGraph();
         this.tezOpVertexMap = Maps.newHashMap();
         this.dagStatsStrings = Lists.newArrayList();
+        this.taskStatsToBeRemoved = Lists.newArrayList();
     }
 
     public void initialize(TezOperPlan tezPlan) {
         super.start();
         try {
             new JobGraphBuilder(tezPlan).visit();
-        } catch (VisitorException e) {
+            for (TezTaskStats taskStat : taskStatsToBeRemoved) {
+                jobPlan.removeAndReconnect(taskStat);
+            }
+        } catch (FrontendException e) {
             LOG.warn("Unable to build Tez DAG", e);
         }
     }

Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld?rev=1581564&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld Tue Mar 25 22:21:27 2014
@@ -0,0 +1,52 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-26
+#--------------------------------------------------
+Tez vertex scope-18	->	Tez vertex scope-20,
+Tez vertex scope-19	->	Tez vertex scope-20,
+Tez vertex scope-20	->	Tez vertex scope-25,
+Tez vertex scope-25
+
+Tez vertex scope-18
+# Plan on vertex
+Local Rearrange[tuple]{bytearray}(false) - scope-22	->	 scope-25
+|   |
+|   Constant(DummyVal) - scope-21
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[chararray] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-19
+# Plan on vertex
+Local Rearrange[tuple]{bytearray}(false) - scope-24	->	 scope-25
+|   |
+|   Constant(DummyVal) - scope-23
+|
+|---c: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[int] - scope-10
+    |   |
+    |   |---Project[bytearray][1] - scope-9
+    |   |
+    |   Cast[chararray] - scope-13
+    |   |
+    |   |---Project[bytearray][0] - scope-12
+    |
+    |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex group scope-20
+# No plan on vertex group
+Tez vertex scope-25
+# Plan on vertex
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-17
+|
+|---POVertexGroupInputTez - scope-20	->	 scope-25

Modified: pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java?rev=1581564&r1=1581563&r2=1581564&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java Tue Mar 25 22:21:27 2014
@@ -313,6 +313,17 @@ public class TestTezCompiler {
         run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC18.gld");
     }
 
+    @Test
+    public void testUnion() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+                "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+                "c = union onschema a, b;" +
+                "store c into 'file:///tmp/output';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld");
+    }
+
     private void run(String query, String expectedFile) throws Exception {
         PhysicalPlan pp = Util.buildPp(pigServer, query);
         TezLauncher launcher = new TezLauncher();