You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/04/22 16:59:44 UTC

svn commit: r1589152 [1/2] - in /pig/branches/tez: src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/optimiz...

Author: rohini
Date: Tue Apr 22 14:59:44 2014
New Revision: 1589152

URL: http://svn.apache.org/r1589152
Log:
PIG-3855: Turn on UnionOptimizer by default and add new e2e tests for union (rohini)

Added:
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10-OPTOFF.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld
Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
    pig/branches/tez/test/e2e/pig/tests/nightly.conf
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld
    pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Tue Apr 22 14:59:44 2014
@@ -719,27 +719,40 @@ public class POLocalRearrange extends Ph
 
     protected void deepCopyTo(POLocalRearrange clone)
             throws CloneNotSupportedException {
-        List<PhysicalPlan> clonePlans = new ArrayList<PhysicalPlan>(
-                plans.size());
-        for (PhysicalPlan plan : plans) {
-            clonePlans.add(plan.clone());
+
+        clone.setParentPlan(parentPlan);
+        clone.index = index;
+        if (useSecondaryKey) {
+            clone.keyType = mainKeyType;
+        } else {
+            clone.keyType = keyType;
         }
+        clone.setUseSecondaryKey(useSecondaryKey);
+        // Needs to be called as setDistinct so that the fake index tuple gets
+        // created.
+        clone.setDistinct(mIsDistinct);
+        clone.setCross(isCross);
+        clone.addOriginalLocation(alias, getOriginalLocations());
+        clone.setStripKeyFromValue(stripKeyFromValue);
+
         try {
-            clone.setPlans(clonePlans);
+            clone.setPlans(clonePlans(plans));
+            if (secondaryPlans != null) {
+                clone.setSecondaryPlans(clonePlans(secondaryPlans));
+            }
         } catch (PlanException pe) {
             CloneNotSupportedException cnse = new CloneNotSupportedException("Problem with setting plans of " + this.getClass().getSimpleName());
             cnse.initCause(pe);
             throw cnse;
         }
-        clone.keyType = keyType;
-        clone.mainKeyType = mainKeyType;
-        clone.secondaryKeyType = secondaryKeyType;
-        clone.useSecondaryKey = useSecondaryKey;
-        clone.index = index;
-        // Needs to be called as setDistinct so that the fake index tuple gets
-        // created.
-        clone.setDistinct(mIsDistinct);
-        clone.addOriginalLocation(alias, getOriginalLocations());
+    }
+
+    private List<PhysicalPlan> clonePlans(List<PhysicalPlan> origPlans) throws CloneNotSupportedException {
+        List<PhysicalPlan> clonePlans = new ArrayList<PhysicalPlan>(origPlans.size());
+        for (PhysicalPlan plan : origPlans) {
+            clonePlans.add(plan.clone());
+        }
+        return clonePlans;
     }
 
     public boolean isCross() {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java Tue Apr 22 14:59:44 2014
@@ -165,12 +165,8 @@ public class MultiQueryOptimizerTez exte
             parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism());
         }
         subPlanOper.setRequestedParallelismByReference(parentOper);
-        if (subPlanOper.UDFs != null) {
-            parentOper.UDFs.addAll(subPlanOper.UDFs);
-        }
-        if (subPlanOper.scalars != null) {
-            parentOper.scalars.addAll(subPlanOper.scalars);
-        }
+        parentOper.UDFs.addAll(subPlanOper.UDFs);
+        parentOper.scalars.addAll(subPlanOper.scalars);
         if (subPlanOper.outEdges != null) {
             for (Entry<OperatorKey, TezEdgeDescriptor> entry: subPlanOper.outEdges.entrySet()) {
                 parentOper.outEdges.put(entry.getKey(), entry.getValue());

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java Tue Apr 22 14:59:44 2014
@@ -39,7 +39,6 @@ import org.apache.pig.impl.io.PigNullabl
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
 
 import com.google.common.collect.Lists;
 
@@ -53,7 +52,7 @@ public class POFRJoinTez extends POFRJoi
     private static final long serialVersionUID = 1L;
 
     // For replicated tables
-    private List<ShuffledUnorderedKVInput> replInputs = Lists.newArrayList();
+    private List<LogicalInput> replInputs = Lists.newArrayList();
     private List<KeyValueReader> replReaders = Lists.newArrayList();
     private List<String> inputKeys;
     private transient boolean isInputCached;
@@ -94,11 +93,8 @@ public class POFRJoinTez extends POFRJoi
         try {
             for (String key : inputKeys) {
                 LogicalInput input = inputs.get(key);
-                if (input instanceof ShuffledUnorderedKVInput) {
-                    ShuffledUnorderedKVInput suInput = (ShuffledUnorderedKVInput) input;
-                    this.replInputs.add(suInput);
-                    this.replReaders.add((KeyValueReader) suInput.getReader());
-                }
+                this.replInputs.add(input);
+                this.replReaders.add((KeyValueReader) input.getReader());
             }
         } catch (Exception e) {
             throw new ExecException(e);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java Tue Apr 22 14:59:44 2014
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -47,6 +49,7 @@ import org.apache.tez.runtime.library.ap
 public class POIdentityInOutTez extends POLocalRearrangeTez implements TezInput, TezOutput {
 
     private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(POIdentityInOutTez.class);
     private String inputKey;
     private transient KeyValueReader reader;
     private transient KeyValuesReader shuffleReader;
@@ -92,6 +95,7 @@ public class POIdentityInOutTez extends 
                 shuffleInput = true;
                 shuffleReader = (KeyValuesReader) r;
             }
+            LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + r);
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -106,6 +110,7 @@ public class POIdentityInOutTez extends 
         }
         try {
             writer = (KeyValueWriter) output.getWriter();
+            LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
         } catch (Exception e) {
             throw new ExecException(e);
         }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java Tue Apr 22 14:59:44 2014
@@ -48,7 +48,7 @@ public class POLocalRearrangeTez extends
     protected String outputKey;
     protected transient KeyValueWriter writer;
 
-    protected boolean isFRJoin = false;
+    protected boolean connectedToPackage = true;
     protected boolean isSkewedJoin = false;
 
     public POLocalRearrangeTez(OperatorKey k) {
@@ -64,6 +64,7 @@ public class POLocalRearrangeTez extends
         if (copy instanceof POLocalRearrangeTez) {
             POLocalRearrangeTez copyTez = (POLocalRearrangeTez) copy;
             this.isSkewedJoin = copyTez.isSkewedJoin;
+            this.connectedToPackage = copyTez.connectedToPackage;
             this.outputKey = copyTez.outputKey;
         }
     }
@@ -76,12 +77,12 @@ public class POLocalRearrangeTez extends
         this.outputKey = outputKey;
     }
 
-    public boolean isFRJoin() {
-        return isFRJoin;
+    public boolean isConnectedToPackage() {
+        return connectedToPackage;
     }
 
-    public void setFRJoin(boolean isFRJoin) {
-        this.isFRJoin = isFRJoin;
+    public void setConnectedToPackage(boolean connectedToPackage) {
+        this.connectedToPackage = connectedToPackage;
     }
 
     public boolean isSkewedJoin() {
@@ -186,6 +187,8 @@ public class POLocalRearrangeTez extends
                 mKey.scope, NodeIdGenerator.getGenerator().getNextNodeId(
                         mKey.scope)), requestedParallelism);
         deepCopyTo(clone);
+        clone.isSkewedJoin = isSkewedJoin;
+        clone.connectedToPackage = connectedToPackage;
         clone.setOutputKey(outputKey);
         return clone;
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Tue Apr 22 14:59:44 2014
@@ -54,7 +54,7 @@ public class POShuffleTezLoad extends PO
 
     private WritableComparator comparator = null;
     private boolean isSkewedJoin = false;
-    
+
     private transient Configuration conf;
 
     public POShuffleTezLoad(POPackage pack) {
@@ -125,6 +125,7 @@ public class POShuffleTezLoad extends PO
                         hasData = true;
                         cur = readers.get(i).getCurrentKey();
                         if (min == null || comparator.compare(min, cur) > 0) {
+                            //Not a deep clone. Writable is referenced.
                             min = ((PigNullableWritable)cur).clone();
                             minIndex = i;
                         }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java Tue Apr 22 14:59:44 2014
@@ -36,7 +36,9 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
 
 /**
  * POValueInputTez is used read tuples from a Tez Intermediate output from a 1-1
@@ -52,6 +54,9 @@ public class POValueInputTez extends Phy
     // TODO Change this to value only reader after implementing
     // value only input output
     private transient KeyValueReader reader;
+    private transient KeyValuesReader shuffleReader;
+    private transient boolean shuffleInput;
+    private transient boolean hasNext;
     protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
 
     public POValueInputTez(OperatorKey k) {
@@ -83,9 +88,17 @@ public class POValueInputTez extends Phy
         if (input == null) {
             throw new ExecException("Input from vertex " + inputKey + " is missing");
         }
+
         try {
-            reader = (KeyValueReader) input.getReader();
-            LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+            Reader r = input.getReader();
+            if (r instanceof KeyValueReader) {
+                reader = (KeyValueReader) r;
+            } else {
+                shuffleInput = true;
+                shuffleReader = (KeyValuesReader) r;
+                hasNext = shuffleReader.next();
+            }
+            LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + r);
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -97,20 +110,30 @@ public class POValueInputTez extends Phy
             if (finished) {
                 return RESULT_EOP;
             }
-            if (reader.next()) {
-                Tuple origTuple = (Tuple)reader.getCurrentValue();
-                Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
-                return new Result(POStatus.STATUS_OK, copy);
+            if (shuffleInput) {
+                while (hasNext) {
+                    if (shuffleReader.getCurrentValues().iterator().hasNext()) {
+                        Tuple origTuple = (Tuple)shuffleReader.getCurrentValues().iterator().next();
+                        Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+                        return new Result(POStatus.STATUS_OK, copy);
+                    }
+                    hasNext = shuffleReader.next();
+                }
             } else {
-                finished = true;
-                // For certain operators (such as STREAM), we could still have some work
-                // to do even after seeing the last input. These operators set a flag that
-                // says all input has been sent and to run the pipeline one more time.
-                if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) {
-                    this.parentPlan.endOfAllInput = true;
+                if (reader.next()) {
+                    Tuple origTuple = (Tuple)reader.getCurrentValue();
+                    Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+                    return new Result(POStatus.STATUS_OK, copy);
                 }
-                return RESULT_EOP;
             }
+            finished = true;
+            // For certain operators (such as STREAM), we could still have some work
+            // to do even after seeing the last input. These operators set a flag that
+            // says all input has been sent and to run the pipeline one more time.
+            if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) {
+                this.parentPlan.endOfAllInput = true;
+            }
+            return RESULT_EOP;
         } catch (IOException e) {
             throw new ExecException(e);
         }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java Tue Apr 22 14:59:44 2014
@@ -99,6 +99,10 @@ public class POValueOutputTez extends Ph
         outputKeys.remove(outputKey);
     }
 
+    public boolean containsOutputKey(String outputKey) {
+        return outputKeys.contains(outputKey);
+    }
+
     @Override
     public Result getNextTuple() throws ExecException {
         Result inp;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Tue Apr 22 14:59:44 2014
@@ -668,7 +668,7 @@ public class TezCompiler extends PhyPlan
                 if (!tezOp.isClosed()) {
                     POLocalRearrangeTez lr = new POLocalRearrangeTez(op.getLRs()[i]);
                     lr.setOutputKey(curTezOp.getOperatorKey().toString());
-                    lr.setFRJoin(true);
+                    lr.setConnectedToPackage(false);
 
                     tezOp.plan.addAsLeaf(lr);
                     TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, tezOp, curTezOp);
@@ -1518,8 +1518,9 @@ public class TezCompiler extends PhyPlan
             POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps1,flat1);
             oper.plan.addAsLeaf(nfe1);
 
+            String numSamples = pigContext.getProperties().getProperty(PigConfiguration.PIG_RANDOM_SAMPLER_SAMPLE_SIZE, "100");
             POReservoirSample poSample = new POReservoirSample(new OperatorKey(scope,nig.getNextNodeId(scope)),
-                    -1, null, 100);
+                    -1, null, Integer.parseInt(numSamples));
             oper.plan.addAsLeaf(poSample);
             lrSample.setOutputKey(curTezOp.getOperatorKey().toString());
             oper.plan.addAsLeaf(lrSample);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue Apr 22 14:59:44 2014
@@ -74,6 +74,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezPOPackageAnnotator.LoRearrangeDiscoverer;
@@ -112,8 +113,8 @@ import org.apache.tez.mapreduce.hadoop.M
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.runtime.library.input.SortedGroupedMergedInput;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
-import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
 
 /**
  * A visitor to construct DAG out of Tez plan.
@@ -223,8 +224,9 @@ public class TezDagBuilder extends TezOp
         EdgeProperty edgeProperty = newEdge(fromOp, toOp);
 
         String groupInputClass = edgeProperty.getDataMovementType().equals(
-                DataMovementType.SCATTER_GATHER) ? ConcatenatedMergedKeyValuesInput.class
-                .getName() : ConcatenatedMergedKeyValueInput.class.getName();
+                DataMovementType.SCATTER_GATHER)
+                        ? SortedGroupedMergedInput.class.getName()
+                        : ConcatenatedMergedKeyValueInput.class.getName();
         return new GroupInputEdge(from, to, edgeProperty,
                 new InputDescriptor(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()));
     }
@@ -457,10 +459,9 @@ public class TezDagBuilder extends TezOp
                     LinkedList<POLocalRearrangeTez> lrs =
                             PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
                     for (POLocalRearrangeTez lr : lrs) {
-                        if (lr.isFRJoin()) {
-                            // skip FR join input
-                        } else if (lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
-                            localRearrangeMap.put((int)lr.getIndex(), inputKey);
+                        if (lr.isConnectedToPackage()
+                                && lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
+                            localRearrangeMap.put((int) lr.getIndex(), inputKey);
                         }
                     }
                 }
@@ -507,12 +508,27 @@ public class TezDagBuilder extends TezOp
 
         MRToTezHelper.convertMRToTezRuntimeConf(payloadConf, globalConf);
 
+        if (!pc.inIllustrator) {
+            for (POStore store : stores) {
+                // unset inputs for POStore, otherwise, map/reduce plan will be unnecessarily deserialized
+                store.setInputs(null);
+                store.setParentPlan(null);
+            }
+            // We put them in the reduce because PigOutputCommitter checks the
+            // ID of the task to see if it's a map, and if not, calls the reduce
+            // committers.
+            payloadConf.set(JobControlCompiler.PIG_MAP_STORES,
+                    ObjectSerializer.serialize(new ArrayList<POStore>()));
+            payloadConf.set(JobControlCompiler.PIG_REDUCE_STORES,
+                    ObjectSerializer.serialize(stores));
+        }
+
         // Take our assembled configuration and create a vertex
         byte[] userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
         procDesc.setUserPayload(userPayload);
         // Can only set parallelism here if the parallelism isn't derived from
         // splits
-        int parallelism = Math.max(tezOp.getRequestedParallelism(), 1);
+        int parallelism = tezOp.getRequestedParallelism();
         InputSplitInfo inputSplitInfo = null;
         if (loads != null && loads.size() > 0) {
             // Not using MRInputAMSplitGenerator because delegation tokens are
@@ -523,6 +539,26 @@ public class TezDagBuilder extends TezOp
             parallelism = inputSplitInfo.getNumTasks();
             tezOp.setRequestedParallelism(parallelism);
         }
+        if (tezOp.getRequestedParallelism() < 0) {
+            if (pc.defaultParallel > 0) {
+                parallelism = pc.defaultParallel;
+            } else {
+                // Rough estimation till we have Automatic Reducer Parallelism
+                // and Parallelism estimator. To be removed.
+                int sumOfPredParallelism = 0;
+                int predParallelism;
+                for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
+                    predParallelism = pred.getRequestedParallelism();
+                    if (predParallelism < 0) {
+                        predParallelism = Math.max(pc.defaultParallel, 1);
+                    }
+                    sumOfPredParallelism += predParallelism;
+                }
+                sumOfPredParallelism = Math.min(sumOfPredParallelism, 20);
+                parallelism = Math.max(sumOfPredParallelism, 1);
+            }
+            tezOp.setRequestedParallelism(parallelism);
+        }
         Vertex vertex = new Vertex(tezOp.getOperatorKey().toString(), procDesc, parallelism,
                 isMap ? MRHelpers.getMapResource(globalConf) : MRHelpers.getReduceResource(globalConf));
 
@@ -705,6 +741,12 @@ public class TezDagBuilder extends TezOp
                             JobControlCompiler.LOG_DIR).toString());
                 }
                 payloadConf.set("pig.streaming.task.output.dir", outputPathString);
+
+                if(tezOp.plan.getLeaves().get(0) instanceof POSplit) {
+                    // Set this so that we get correct counters
+                    st.setMultiStore(true);
+                }
+
             } else { // multi store case
                 log.info("Setting up multi store job");
                 String tmpLocationStr = FileLocalizer.getTemporaryPath(pc)
@@ -729,17 +771,6 @@ public class TezDagBuilder extends TezOp
                         tmpLocation.toString());
             }
 
-            if (!pc.inIllustrator) {
-
-                // We put them in the reduce because PigOutputCommitter checks the
-                // ID of the task to see if it's a map, and if not, calls the reduce
-                // committers.
-                payloadConf.set(JobControlCompiler.PIG_MAP_STORES,
-                        ObjectSerializer.serialize(new ArrayList<POStore>()));
-                payloadConf.set(JobControlCompiler.PIG_REDUCE_STORES,
-                        ObjectSerializer.serialize(stores));
-            }
-
         }
         return stores;
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Tue Apr 22 14:59:44 2014
@@ -195,7 +195,7 @@ public class TezLauncher extends Launche
         }
 
         boolean isUnionOpt = "true".equalsIgnoreCase(pc.getProperties()
-                .getProperty(PigConfiguration.TEZ_OPT_UNION, "false"));
+                .getProperty(PigConfiguration.TEZ_OPT_UNION, "true"));
         // Use VertexGroup in Tez
         if (isUnionOpt) {
             UnionOptimizer uo = new UnionOptimizer(tezPlan);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Tue Apr 22 14:59:44 2014
@@ -415,6 +415,10 @@ public class TezOperator extends Operato
             this.inputKeys.add(input);
         }
 
+        public boolean removeInput(OperatorKey input) {
+            return this.inputKeys.remove(input);
+        }
+
         public String getOutput() {
             return outputKey;
         }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java Tue Apr 22 14:59:44 2014
@@ -22,10 +22,14 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.POStoreTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.POValueOutputTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.RoundRobinPartitioner;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezOpPlanVisitor;
@@ -36,6 +40,9 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 
 /**
  * Optimizes union by removing the intermediate union vertex and making the
@@ -66,6 +73,7 @@ public class UnionOptimizer extends TezO
         }
 
         TezOperator unionOp = tezOp;
+        String unionOpKey = unionOp.getOperatorKey().toString();
         String scope = unionOp.getOperatorKey().scope;
         TezOperPlan tezPlan = getPlan();
 
@@ -124,16 +132,28 @@ public class UnionOptimizer extends TezO
             for (OperatorKey predKey : unionOp.getVertexGroupMembers()) {
                 TezOperator pred = tezPlan.getOperator(predKey);
                 PhysicalPlan predPlan = pred.plan;
-                // Remove POValueOutputTez from predecessor leaf
-                predPlan.remove(predPlan.getLeaves().get(0));
-
                 PhysicalOperator predLeaf = predPlan.getLeaves().get(0);
+                // if predLeaf not POValueOutputTez
+                if (predLeaf instanceof POSplit) {
+                    // Find the subPlan that connects to the union operator
+                    predPlan = getUnionPredPlanFromSplit(predPlan, unionOpKey);
+                    predLeaf = predPlan.getLeaves().get(0);
+                }
+
                 PhysicalPlan clonePlan = unionOpPlan.clone();
                 //Clone changes the operator keys
                 List<POStoreTez> clonedUnionStoreOutputs = PlanHelper.getPhysicalOperators(clonePlan, POStoreTez.class);
 
+                // Remove POValueOutputTez from predecessor leaf
+                predPlan.remove(predLeaf);
+                boolean isEmptyPlan = predPlan.isEmpty();
+                if (!isEmptyPlan) {
+                    predLeaf = predPlan.getLeaves().get(0);
+                }
                 predPlan.merge(clonePlan);
-                predPlan.connect(predLeaf, clonePlan.getRoots().get(0));
+                if (!isEmptyPlan) {
+                    predPlan.connect(predLeaf, clonePlan.getRoots().get(0));
+                }
 
                 // Connect predecessor to the storeVertexGroups
                 int i = 0;
@@ -153,25 +173,73 @@ public class UnionOptimizer extends TezO
                     tezPlan.connect(pred, outputVertexGroup);
                 }
 
+                copyOperatorProperties(pred, unionOp);
                 tezPlan.disconnect(pred, unionOp);
             }
 
+            List<TezOperator> successors = tezPlan.getSuccessors(unionOp);
+            List<TezOutput> valueOnlyOutputs = new ArrayList<TezOutput>();
+            for (TezOutput tezOutput : unionOutputs) {
+                if (tezOutput instanceof POValueOutputTez) {
+                    valueOnlyOutputs.add(tezOutput);
+                }
+            }
+            // Connect to outputVertexGroupOps
             // Copy output edges of union -> successor to predecessor->successor, vertexgroup -> successor
             // and connect vertexgroup -> successor in the plan.
             for (Entry<OperatorKey, TezEdgeDescriptor> entry : unionOp.outEdges.entrySet()) {
                 TezOperator succOp = tezPlan.getOperator(entry.getKey());
+                // Case of union followed by union.
+                // unionOp.outEdges will not point to vertex group, but to its output.
+                // So find the vertex group if there is one.
+                TezOperator succOpVertexGroup = null;
+                for (TezOperator succ : successors) {
+                    if (succ.isVertexGroup()
+                            && succ.getVertexGroupInfo().getOutput()
+                                    .equals(succOp.getOperatorKey().toString())) {
+                        succOpVertexGroup = succ;
+                        break;
+                    }
+                }
+                TezEdgeDescriptor edge = entry.getValue();
+                // Edge cannot be one to one as it will get input from two or
+                // more union predecessors. Change it to SCATTER_GATHER
+                if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
+                    edge.dataMovementType = DataMovementType.SCATTER_GATHER;
+                    edge.partitionerClass = RoundRobinPartitioner.class;
+                    edge.outputClassName = OnFileSortedOutput.class.getName();
+                    edge.inputClassName = ShuffledMergedInput.class.getName();
+
+                    for (TezOutput tezOutput : valueOnlyOutputs) {
+                        if (ArrayUtils.contains(tezOutput.getTezOutputs(), entry.getKey().toString())) {
+                            edge.setIntermediateOutputKeyComparatorClass(
+                                    POValueOutputTez.EmptyWritableComparator.class.getName());
+                        }
+                    }
+                }
                 TezOperator vertexGroupOp = outputVertexGroupOps[unionOutputKeys.indexOf(entry.getKey().toString())];
-                // Required for create the Edge in TezDAGBuilder
                 for (OperatorKey predKey : vertexGroupOp.getVertexGroupMembers()) {
                     TezOperator pred = tezPlan.getOperator(predKey);
-                    pred.outEdges.put(entry.getKey(), entry.getValue());
-                    succOp.inEdges.put(predKey, entry.getValue());
+                    // Keep the output edge directly to successor
+                    // Don't need to keep output edge for vertexgroup
+                    pred.outEdges.put(entry.getKey(), edge);
+                    succOp.inEdges.put(predKey, edge);
+                    if (succOpVertexGroup != null) {
+                        succOpVertexGroup.getVertexGroupMembers().add(predKey);
+                        succOpVertexGroup.getVertexGroupInfo().addInput(predKey);
+                        // Connect directly to the successor vertex group
+                        tezPlan.disconnect(pred, vertexGroupOp);
+                        tezPlan.connect(pred, succOpVertexGroup);
+                    }
+                }
+                if (succOpVertexGroup != null) {
+                    succOpVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
+                    succOpVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
+                    //Discard the new vertex group created
+                    tezPlan.remove(vertexGroupOp);
+                } else {
+                    tezPlan.connect(vertexGroupOp, succOp);
                 }
-                // Not used in TezDAGBuilder. Just setting for correctness.
-                vertexGroupOp.outEdges.put(entry.getKey(), entry.getValue());
-                succOp.inEdges.put(vertexGroupOp.getOperatorKey(), entry.getValue());
-
-                tezPlan.connect(vertexGroupOp, succOp);
             }
         } catch (Exception e) {
             throw new VisitorException(e);
@@ -186,7 +254,7 @@ public class UnionOptimizer extends TezO
                 LinkedList<TezInput> inputs = PlanHelper.getPhysicalOperators(succ.plan, TezInput.class);
                 for (TezInput input : inputs) {
                     for (String key : input.getTezInputs()) {
-                        if (key.equals(unionOp.getOperatorKey().toString())) {
+                        if (key.equals(unionOpKey)) {
                             input.replaceInput(key,
                                     newOutputKeys[unionOutputKeys.indexOf(succ.getOperatorKey().toString())]);
                         }
@@ -201,4 +269,25 @@ public class UnionOptimizer extends TezO
 
     }
 
+    private void copyOperatorProperties(TezOperator pred, TezOperator unionOp) {
+        pred.setUseSecondaryKey(unionOp.isUseSecondaryKey());
+        pred.UDFs.addAll(unionOp.UDFs);
+        pred.scalars.addAll(unionOp.scalars);
+    }
+
+    public static PhysicalPlan getUnionPredPlanFromSplit(PhysicalPlan plan, String unionOpKey) throws VisitorException {
+        List<POSplit> splits = PlanHelper.getPhysicalOperators(plan, POSplit.class);
+        for (POSplit split : splits) {
+            for (PhysicalPlan subPlan : split.getPlans()) {
+                if (subPlan.getLeaves().get(0) instanceof POValueOutputTez) {
+                    POValueOutputTez out = (POValueOutputTez) subPlan.getLeaves().get(0);
+                    if (out.containsOutputKey(unionOpKey)) {
+                        return subPlan;
+                    }
+                }
+            }
+        }
+        throw new VisitorException("Did not find the union predecessor in the split plan");
+    }
+
 }

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java Tue Apr 22 14:59:44 2014
@@ -92,14 +92,9 @@ public class TezTaskStats extends JobSta
             long hdfsBytesRead = -1;
             String filename = fs.getFileName();
             Map<String, Long> taskCounter = map.get(TASK_COUNTER_GROUP);
-            if (taskCounter != null) {
-                //TaskCounter.INPUT_RECORDS_PROCESSED.name()
-                if (taskCounter.get("INPUT_RECORDS_PROCESSED") != null) {
-                    records = taskCounter.get("INPUT_RECORDS_PROCESSED");
-                } else if (taskCounter.get(PigStatsUtil.MAP_INPUT_RECORDS) != null) {
-                    // Tez 0.3 has MAP_INPUT_RECORDS TODO: Remove after we move away from Tez 0.3
-                    records = taskCounter.get(PigStatsUtil.MAP_INPUT_RECORDS);
-                }
+            if (taskCounter != null
+                    && taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name()) != null) {
+                records = taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name());
             }
             if (map.get(FS_COUNTER_GROUP) !=null &&
                     map.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) {
@@ -124,13 +119,9 @@ public class TezTaskStats extends JobSta
             if (sto.isMultiStore()) {
                 Long n = map.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP).get(PigStatsUtil.getMultiStoreCounterName(sto));
                 if (n != null) records = n;
-            } else if (map.get(TASK_COUNTER_GROUP) != null) {
-                if(map.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name()) != null) {
-                    records = map.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name());
-                } else if(map.get(TASK_COUNTER_GROUP).get(PigStatsUtil.MAP_OUTPUT_RECORDS) != null) {
-                    // Tez 0.3 has MAP_OUTPUT_RECORDS TODO: Remove after we move away from Tez 0.3
-                    records = map.get(TASK_COUNTER_GROUP).get(PigStatsUtil.MAP_OUTPUT_RECORDS);
-                }
+            } else if (map.get(TASK_COUNTER_GROUP) != null
+                    && map.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name()) != null) {
+                records = map.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name());
             }
             /*
             if (map.get(FS_COUNTER_GROUP)!= null &&

Modified: pig/branches/tez/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/nightly.conf?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/nightly.conf Tue Apr 22 14:59:44 2014
@@ -1360,6 +1360,101 @@ d = foreach b generate name, age;
 e = union c, d;
 store e into ':OUTPATH:';\,
 			},
+			{
+            'num' => 2,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = group c by name;
+e = foreach d generate group, SUM(c.age);
+store e into ':OUTPATH:';\,
+            },
+            {
+            'num' => 3,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = group c by name;
+e = foreach d { f = order c by $1,$2; generate group, f; };
+store e into ':OUTPATH:';\,
+            },
+            {
+            'num' => 4,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = order c by name PARALLEL 2;
+store d into ':OUTPATH:';\,
+            },
+            {
+            'num' => 5,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+split a into a1 if age < 50, a2 otherwise;
+c = union a1, b;
+d = order c by name PARALLEL 2;
+store a2 into ':OUTPATH:.1';
+store d into ':OUTPATH:.2';\,
+            },
+            {
+            'num' => 6,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join c by name, d by name PARALLEL 2;
+store e into ':OUTPATH:';\,
+            },
+            {
+            'num' => 7,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join c by name, d by name using 'replicated';
+store e into ':OUTPATH:';\,
+            },
+            {
+            'num' => 8,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by name, c by name using 'replicated';
+store e into ':OUTPATH:';\,
+            },
+            {
+            'num' => 9,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join c by name, d by name using 'skewed' PARALLEL 5;
+store e into ':OUTPATH:';\,
+            },
+            {
+            'num' => 10,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by name, c by name using 'skewed' PARALLEL 5;
+store e into ':OUTPATH:';\,
+            },
+            {
+            'num' => 11,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = foreach a generate name, age;
+e = foreach b generate name, age;
+f = foreach c generate name, age;
+g = union d, e;
+h = union f, g;
+i = group h by name;
+i = foreach i generate group, SUM(h.age);
+store i into ':OUTPATH:';\,
+            },
 		]
 		},
 		{

Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld Tue Apr 22 14:59:44 2014
@@ -2,310 +2,307 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-412
+# TEZ DAG plan: scope-418
 #--------------------------------------------------
-Tez vertex scope-316	->	Tez vertex scope-318,Tez vertex scope-329,Tez vertex scope-340,
-Tez vertex scope-318	->	Tez vertex scope-321,Tez vertex scope-334,Tez vertex scope-345,
-Tez vertex scope-345	->	Tez vertex scope-348,Tez vertex scope-372,
-Tez vertex scope-348	->	Tez vertex scope-366,Tez vertex scope-356,
-Tez vertex scope-356	->	Tez vertex scope-366,
-Tez vertex scope-366	->	Tez vertex scope-368,
-Tez vertex scope-368
-Tez vertex scope-340	->	Tez vertex scope-343,Tez vertex scope-384,
-Tez vertex scope-384	->	Tez vertex scope-388,
-Tez vertex scope-372	->	Tez vertex scope-376,
-Tez vertex scope-376	->	Tez vertex scope-382,Tez vertex scope-386,
-Tez vertex scope-386	->	Tez vertex scope-388,
-Tez vertex scope-388
-Tez vertex scope-329	->	Tez vertex scope-332,Tez vertex scope-337,
+Tez vertex scope-319	->	Tez vertex scope-321,Tez vertex scope-332,Tez vertex scope-343,
+Tez vertex scope-332	->	Tez vertex scope-335,Tez vertex scope-340,
+Tez vertex scope-335	->	Tez vertex scope-339,
+Tez vertex scope-321	->	Tez vertex scope-324,Tez vertex scope-337,Tez vertex scope-348,
 Tez vertex scope-337	->	Tez vertex scope-339,
 Tez vertex scope-339
-Tez vertex scope-382
-Tez vertex scope-321	->	Tez vertex scope-323,
-Tez vertex scope-323	->	Tez vertex scope-325,Tez vertex scope-327,
-Tez vertex scope-327
-Tez vertex scope-325
-Tez vertex scope-343
-Tez vertex scope-332	->	Tez vertex scope-336,
-Tez vertex scope-334	->	Tez vertex scope-336,
-Tez vertex scope-336
+Tez vertex scope-348	->	Tez vertex scope-351,Tez vertex scope-375,
+Tez vertex scope-375	->	Tez vertex scope-379,
+Tez vertex scope-379	->	Tez vertex scope-385,Tez vertex scope-389,
+Tez vertex scope-385
+Tez vertex scope-343	->	Tez vertex scope-346,Tez vertex scope-387,
+Tez vertex scope-387	->	Tez vertex scope-415,
+Tez vertex scope-389	->	Tez vertex scope-415,
+Tez vertex scope-415
+Tez vertex scope-346
+Tez vertex scope-340	->	Tez vertex scope-342,
+Tez vertex scope-342
+Tez vertex scope-324	->	Tez vertex scope-326,
+Tez vertex scope-326	->	Tez vertex scope-328,Tez vertex scope-330,
+Tez vertex scope-328
+Tez vertex scope-351	->	Tez vertex scope-369,Tez vertex scope-359,
+Tez vertex scope-359	->	Tez vertex scope-369,
+Tez vertex scope-369	->	Tez vertex scope-371,
+Tez vertex scope-371
+Tez vertex scope-330
 
-Tez vertex scope-316
+Tez vertex scope-319
 # Plan on vertex
-POValueOutputTez - scope-317	->	 [scope-340, scope-318, scope-329]
+POValueOutputTez - scope-320	->	 [scope-321, scope-332, scope-343]
 |
-|---a: New For Each(false,false)[bag] - scope-217
+|---a: New For Each(false,false)[bag] - scope-220
     |   |
-    |   Cast[int] - scope-212
+    |   Cast[int] - scope-215
     |   |
-    |   |---Project[bytearray][0] - scope-211
+    |   |---Project[bytearray][0] - scope-214
     |   |
-    |   Cast[int] - scope-215
+    |   Cast[int] - scope-218
     |   |
-    |   |---Project[bytearray][1] - scope-214
+    |   |---Project[bytearray][1] - scope-217
     |
-    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-210
-Tez vertex scope-318
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-213
+Tez vertex scope-332
 # Plan on vertex
-POValueOutputTez - scope-320	->	 [scope-321, scope-345, scope-334]
+POValueOutputTez - scope-334	->	 [scope-340, scope-335]
 |
-|---b: Filter[bag] - scope-219
+|---c: Filter[bag] - scope-247
     |   |
-    |   Less Than or Equal[boolean] - scope-222
+    |   Less Than or Equal[boolean] - scope-250
     |   |
-    |   |---Project[int][0] - scope-220
+    |   |---Project[int][0] - scope-248
     |   |
-    |   |---Constant(5) - scope-221
+    |   |---Constant(10) - scope-249
     |
-    |---POValueInputTez - scope-319	<-	 scope-316
-Tez vertex scope-345
+    |---POValueInputTez - scope-333	<-	 scope-319
+Tez vertex scope-335
 # Plan on vertex
-POValueOutputTez - scope-347	->	 [scope-348, scope-372]
-|
-|---POValueInputTez - scope-346	<-	 scope-318
-Tez vertex scope-348
-# Plan on vertex
-Local Rearrange[tuple]{tuple}(false) - scope-352	->	 scope-356
+c1: Local Rearrange[tuple]{int}(false) - scope-260	->	 scope-339
 |   |
-|   Constant(DummyVal) - scope-351
+|   Project[int][0] - scope-261
 |
-|---ReservoirSample - scope-355
-    |
-    |---New For Each(false)[tuple] - scope-354
-        |   |
-        |   Project[int][0] - scope-353
-        |
-        |---e1: Local Rearrange[tuple]{int}(false) - scope-350	->	 scope-366
-            |   |
-            |   Project[int][0] - scope-298
-            |
-            |---e: Filter[bag] - scope-294
-                |   |
-                |   Less Than[boolean] - scope-297
-                |   |
-                |   |---Project[int][0] - scope-295
-                |   |
-                |   |---Constant(3) - scope-296
-                |
-                |---POValueInputTez - scope-349	<-	 scope-345
-Tez vertex scope-356
+|---POValueInputTez - scope-336	<-	 scope-332
+Tez vertex scope-321
 # Plan on vertex
-POValueOutputTez - scope-365	->	 [scope-366]
+POValueOutputTez - scope-323	->	 [scope-348, scope-337, scope-324]
 |
-|---New For Each(false)[tuple] - scope-364
+|---b: Filter[bag] - scope-222
+    |   |
+    |   Less Than or Equal[boolean] - scope-225
     |   |
-    |   POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-363
+    |   |---Project[int][0] - scope-223
     |   |
-    |   |---Project[tuple][*] - scope-362
+    |   |---Constant(5) - scope-224
     |
-    |---New For Each(false,false)[tuple] - scope-361
-        |   |
-        |   Constant(1) - scope-360
-        |   |
-        |   Project[bag][1] - scope-358
-        |
-        |---Package(Packager)[tuple]{bytearray} - scope-357
-Tez vertex scope-366
+    |---POValueInputTez - scope-322	<-	 scope-319
+Tez vertex scope-337
 # Plan on vertex
-POIdentityInOutTez - scope-367	<-	 scope-348	->	 scope-368
+c1: Local Rearrange[tuple]{int}(false) - scope-262	->	 scope-339
 |   |
-|   Project[int][0] - scope-298
-Tez vertex scope-368
-# Plan on vertex
-e1: Store(file:///tmp/output/e1:org.apache.pig.builtin.PigStorage) - scope-300
+|   Project[int][0] - scope-263
 |
-|---New For Each(true)[tuple] - scope-371
-    |   |
-    |   Project[bag][1] - scope-370
-    |
-    |---Package(LitePackager)[tuple]{int} - scope-369
-Tez vertex scope-340
+|---POValueInputTez - scope-338	<-	 scope-321
+Tez vertex scope-339
 # Plan on vertex
-POValueOutputTez - scope-342	->	 [scope-384, scope-343]
+c1: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-267
 |
-|---d1: Filter[bag] - scope-283
-    |   |
-    |   Equal To[boolean] - scope-286
+|---c1: New For Each(true,true)[tuple] - scope-266
     |   |
-    |   |---Project[int][0] - scope-284
+    |   Project[bag][1] - scope-264
     |   |
-    |   |---Constant(5) - scope-285
+    |   Project[bag][2] - scope-265
     |
-    |---d: Filter[bag] - scope-279
-        |   |
-        |   Greater Than[boolean] - scope-282
-        |   |
-        |   |---Project[int][0] - scope-280
-        |   |
-        |   |---Constant(10) - scope-281
-        |
-        |---POValueInputTez - scope-341	<-	 scope-316
-Tez vertex scope-384
+    |---c1: Package(Packager)[tuple]{int} - scope-259
+Tez vertex scope-348
 # Plan on vertex
-POValueOutputTez - scope-390	->	 [scope-388]
+POValueOutputTez - scope-350	->	 [scope-375, scope-351]
 |
-|---POValueInputTez - scope-385	<-	 scope-340
-Tez vertex scope-372
+|---POValueInputTez - scope-349	<-	 scope-321
+Tez vertex scope-375
 # Plan on vertex
-f1: Local Rearrange[tuple]{tuple}(false) - scope-375	->	 scope-376
+f1: Local Rearrange[tuple]{tuple}(false) - scope-378	->	 scope-379
 |   |
-|   Project[tuple][*] - scope-374
+|   Project[tuple][*] - scope-377
 |
-|---f1: Limit - scope-305
+|---f1: Limit - scope-308
     |
-    |---f: Filter[bag] - scope-301
+    |---f: Filter[bag] - scope-304
         |   |
-        |   Greater Than or Equal[boolean] - scope-304
+        |   Greater Than or Equal[boolean] - scope-307
         |   |
-        |   |---Project[int][0] - scope-302
+        |   |---Project[int][0] - scope-305
         |   |
-        |   |---Constant(3) - scope-303
+        |   |---Constant(3) - scope-306
         |
-        |---POValueInputTez - scope-373	<-	 scope-345
-Tez vertex scope-376
+        |---POValueInputTez - scope-376	<-	 scope-348
+Tez vertex scope-379
 # Plan on vertex
-POValueOutputTez - scope-381	->	 [scope-386, scope-382]
+POValueOutputTez - scope-384	->	 [scope-389, scope-385]
 |
-|---f1: Limit - scope-380
+|---f1: Limit - scope-383
     |
-    |---f1: New For Each(true)[bag] - scope-379
+    |---f1: New For Each(true)[bag] - scope-382
         |   |
-        |   Project[tuple][1] - scope-378
+        |   Project[tuple][1] - scope-381
         |
-        |---f1: Package(Packager)[tuple]{tuple} - scope-377
-Tez vertex scope-386
+        |---f1: Package(Packager)[tuple]{tuple} - scope-380
+Tez vertex scope-385
 # Plan on vertex
-POValueOutputTez - scope-391	->	 [scope-388]
+f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-312
 |
-|---POValueInputTez - scope-387	<-	 scope-376
-Tez vertex scope-388
-# Plan on vertex
-f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-315
-|
-|---POShuffledValueInputTez - scope-389	<-	 [scope-386, scope-384]
-Tez vertex scope-329
+|---POValueInputTez - scope-386	<-	 scope-379
+Tez vertex scope-343
 # Plan on vertex
-POValueOutputTez - scope-331	->	 [scope-337, scope-332]
+POValueOutputTez - scope-345	->	 [scope-387, scope-346]
 |
-|---c: Filter[bag] - scope-244
+|---d1: Filter[bag] - scope-286
     |   |
-    |   Less Than or Equal[boolean] - scope-247
+    |   Equal To[boolean] - scope-289
     |   |
-    |   |---Project[int][0] - scope-245
+    |   |---Project[int][0] - scope-287
     |   |
-    |   |---Constant(10) - scope-246
+    |   |---Constant(5) - scope-288
     |
-    |---POValueInputTez - scope-330	<-	 scope-316
-Tez vertex scope-337
+    |---d: Filter[bag] - scope-282
+        |   |
+        |   Greater Than[boolean] - scope-285
+        |   |
+        |   |---Project[int][0] - scope-283
+        |   |
+        |   |---Constant(10) - scope-284
+        |
+        |---POValueInputTez - scope-344	<-	 scope-319
+Tez vertex scope-387
 # Plan on vertex
-c2: Local Rearrange[tuple]{int}(false) - scope-404	->	 scope-339
+f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-416
+|
+|---POValueInputTez - scope-388	<-	 scope-343
+Tez vertex scope-389
+# Plan on vertex
+f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-417
+|
+|---POValueInputTez - scope-390	<-	 scope-379
+Tez vertex group scope-415	<-	 [scope-387, scope-389]	->	 null
+# No plan on vertex group
+Tez vertex scope-346
+# Plan on vertex
+d1: Store(file:///tmp/output/d1:org.apache.pig.builtin.PigStorage) - scope-293
+|
+|---POValueInputTez - scope-347	<-	 scope-343
+Tez vertex scope-340
+# Plan on vertex
+c2: Local Rearrange[tuple]{int}(false) - scope-407	->	 scope-342
 |   |
-|   Project[int][0] - scope-406
+|   Project[int][0] - scope-409
 |
-|---c3: New For Each(false,false)[bag] - scope-392
+|---c3: New For Each(false,false)[bag] - scope-395
     |   |
-    |   Project[int][0] - scope-393
+    |   Project[int][0] - scope-396
     |   |
-    |   POUserFunc(org.apache.pig.builtin.AlgebraicMathBase$Initial)[tuple] - scope-394
+    |   POUserFunc(org.apache.pig.builtin.AlgebraicMathBase$Initial)[tuple] - scope-397
     |   |
-    |   |---Project[bag][0] - scope-395
+    |   |---Project[bag][0] - scope-398
     |       |
-    |       |---Project[bag][1] - scope-396
+    |       |---Project[bag][1] - scope-399
     |
-    |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-407
+    |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-410
         |
-        |---POValueInputTez - scope-338	<-	 scope-329
-Tez vertex scope-339
-# Combine plan on edge <scope-337>
-c2: Local Rearrange[tuple]{int}(false) - scope-408	->	 scope-339
+        |---POValueInputTez - scope-341	<-	 scope-332
+Tez vertex scope-342
+# Combine plan on edge <scope-340>
+c2: Local Rearrange[tuple]{int}(false) - scope-411	->	 scope-342
 |   |
-|   Project[int][0] - scope-410
+|   Project[int][0] - scope-413
 |
-|---c3: New For Each(false,false)[bag] - scope-397
+|---c3: New For Each(false,false)[bag] - scope-400
     |   |
-    |   Project[int][0] - scope-398
+    |   Project[int][0] - scope-401
     |   |
-    |   POUserFunc(org.apache.pig.builtin.LongSum$Intermediate)[tuple] - scope-399
+    |   POUserFunc(org.apache.pig.builtin.LongSum$Intermediate)[tuple] - scope-402
     |   |
-    |   |---Project[bag][1] - scope-400
+    |   |---Project[bag][1] - scope-403
     |
-    |---c2: Package(CombinerPackager)[tuple]{int} - scope-403
+    |---c2: Package(CombinerPackager)[tuple]{int} - scope-406
 # Plan on vertex
-c3: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-278
+c3: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-281
 |
-|---c3: New For Each(false,false)[bag] - scope-277
+|---c3: New For Each(false,false)[bag] - scope-280
     |   |
-    |   Project[int][0] - scope-271
+    |   Project[int][0] - scope-274
     |   |
-    |   POUserFunc(org.apache.pig.builtin.LongSum$Final)[long] - scope-275
+    |   POUserFunc(org.apache.pig.builtin.LongSum$Final)[long] - scope-278
     |   |
-    |   |---Project[bag][1] - scope-401
+    |   |---Project[bag][1] - scope-404
     |
-    |---c2: Package(CombinerPackager)[tuple]{int} - scope-268
-Tez vertex scope-382
-# Plan on vertex
-f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-309
-|
-|---POValueInputTez - scope-383	<-	 scope-376
-Tez vertex scope-321
+    |---c2: Package(CombinerPackager)[tuple]{int} - scope-271
+Tez vertex scope-324
 # Plan on vertex
-b1: Local Rearrange[tuple]{int}(false) - scope-228	->	 scope-323
+b1: Local Rearrange[tuple]{int}(false) - scope-231	->	 scope-326
 |   |
-|   Project[int][0] - scope-229
+|   Project[int][0] - scope-232
 |
-|---POValueInputTez - scope-322	<-	 scope-318
-Tez vertex scope-323
+|---POValueInputTez - scope-325	<-	 scope-321
+Tez vertex scope-326
 # Plan on vertex
-POValueOutputTez - scope-324	->	 [scope-327, scope-325]
+POValueOutputTez - scope-327	->	 [scope-330, scope-328]
 |
-|---b1: Package(Packager)[tuple]{int} - scope-227
-Tez vertex scope-327
+|---b1: Package(Packager)[tuple]{int} - scope-230
+Tez vertex scope-328
 # Plan on vertex
-b2: Store(file:///tmp/output/b2:org.apache.pig.builtin.PigStorage) - scope-243
+b1: Store(file:///tmp/output/b1:org.apache.pig.builtin.PigStorage) - scope-236
 |
-|---b2: New For Each(false,false)[bag] - scope-242
-    |   |
-    |   Project[int][0] - scope-236
-    |   |
-    |   POUserFunc(org.apache.pig.builtin.LongSum)[long] - scope-240
-    |   |
-    |   |---Project[bag][0] - scope-239
-    |       |
-    |       |---Project[bag][1] - scope-238
-    |
-    |---POValueInputTez - scope-328	<-	 scope-323
-Tez vertex scope-325
+|---POValueInputTez - scope-329	<-	 scope-326
+Tez vertex scope-351
 # Plan on vertex
-b1: Store(file:///tmp/output/b1:org.apache.pig.builtin.PigStorage) - scope-233
+Local Rearrange[tuple]{tuple}(false) - scope-355	->	 scope-359
+|   |
+|   Constant(DummyVal) - scope-354
 |
-|---POValueInputTez - scope-326	<-	 scope-323
-Tez vertex scope-343
+|---ReservoirSample - scope-358
+    |
+    |---New For Each(false)[tuple] - scope-357
+        |   |
+        |   Project[int][0] - scope-356
+        |
+        |---e1: Local Rearrange[tuple]{int}(false) - scope-353	->	 scope-369
+            |   |
+            |   Project[int][0] - scope-301
+            |
+            |---e: Filter[bag] - scope-297
+                |   |
+                |   Less Than[boolean] - scope-300
+                |   |
+                |   |---Project[int][0] - scope-298
+                |   |
+                |   |---Constant(3) - scope-299
+                |
+                |---POValueInputTez - scope-352	<-	 scope-348
+Tez vertex scope-359
 # Plan on vertex
-d1: Store(file:///tmp/output/d1:org.apache.pig.builtin.PigStorage) - scope-290
+POValueOutputTez - scope-368	->	 [scope-369]
 |
-|---POValueInputTez - scope-344	<-	 scope-340
-Tez vertex scope-332
+|---New For Each(false)[tuple] - scope-367
+    |   |
+    |   POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-366
+    |   |
+    |   |---Project[tuple][*] - scope-365
+    |
+    |---New For Each(false,false)[tuple] - scope-364
+        |   |
+        |   Constant(1) - scope-363
+        |   |
+        |   Project[bag][1] - scope-361
+        |
+        |---Package(Packager)[tuple]{bytearray} - scope-360
+Tez vertex scope-369
 # Plan on vertex
-c1: Local Rearrange[tuple]{int}(false) - scope-257	->	 scope-336
+POIdentityInOutTez - scope-370	<-	 scope-351	->	 scope-371
 |   |
-|   Project[int][0] - scope-258
-|
-|---POValueInputTez - scope-333	<-	 scope-329
-Tez vertex scope-334
+|   Project[int][0] - scope-301
+Tez vertex scope-371
 # Plan on vertex
-c1: Local Rearrange[tuple]{int}(false) - scope-259	->	 scope-336
-|   |
-|   Project[int][0] - scope-260
+e1: Store(file:///tmp/output/e1:org.apache.pig.builtin.PigStorage) - scope-303
 |
-|---POValueInputTez - scope-335	<-	 scope-318
-Tez vertex scope-336
+|---New For Each(true)[tuple] - scope-374
+    |   |
+    |   Project[bag][1] - scope-373
+    |
+    |---Package(LitePackager)[tuple]{int} - scope-372
+Tez vertex scope-330
 # Plan on vertex
-c1: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-264
+b2: Store(file:///tmp/output/b2:org.apache.pig.builtin.PigStorage) - scope-246
 |
-|---c1: New For Each(true,true)[tuple] - scope-263
+|---b2: New For Each(false,false)[bag] - scope-245
     |   |
-    |   Project[bag][1] - scope-261
+    |   Project[int][0] - scope-239
     |   |
-    |   Project[bag][2] - scope-262
+    |   POUserFunc(org.apache.pig.builtin.LongSum)[long] - scope-243
+    |   |
+    |   |---Project[bag][0] - scope-242
+    |       |
+    |       |---Project[bag][1] - scope-241
     |
-    |---c1: Package(Packager)[tuple]{int} - scope-256
+    |---POValueInputTez - scope-331	<-	 scope-326

Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld Tue Apr 22 14:59:44 2014
@@ -2,9 +2,11 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-209
+# TEZ DAG plan: scope-212
 #--------------------------------------------------
-Tez vertex scope-106	->	Tez vertex scope-119,Tez vertex scope-113,Tez vertex scope-126,Tez vertex scope-156,Tez vertex scope-146,Tez vertex scope-166,Tez vertex scope-178,
+Tez vertex scope-106	->	Tez vertex scope-119,Tez vertex scope-113,Tez vertex scope-126,Tez vertex scope-156,Tez vertex scope-146,Tez vertex scope-166,Tez vertex scope-209,
+Tez vertex scope-166	->	Tez vertex scope-209,
+Tez vertex scope-209
 Tez vertex scope-113
 Tez vertex scope-119	->	Tez vertex scope-126,Tez vertex scope-129,
 Tez vertex scope-126
@@ -12,8 +14,6 @@ Tez vertex scope-129
 Tez vertex scope-146	->	Tez vertex scope-156,
 Tez vertex scope-156	->	Tez vertex scope-158,
 Tez vertex scope-158
-Tez vertex scope-166	->	Tez vertex scope-178,
-Tez vertex scope-178
 
 Tez vertex scope-106
 # Plan on vertex
@@ -79,7 +79,7 @@ Tez vertex scope-106
 |   |   |
 |   |   d1: Store(file:///tmp/output/d1:org.apache.pig.builtin.PigStorage) - scope-80
 |   |   |
-|   |   POValueOutputTez - scope-180	->	 [scope-178]
+|   |   f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-210
 |   |
 |   |---d1: Filter[bag] - scope-73
 |       |   |
@@ -110,6 +110,23 @@ Tez vertex scope-106
     |   |---Project[bytearray][1] - scope-4
     |
     |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-166
+# Plan on vertex
+f1: Split - scope-203
+|   |
+|   f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-99
+|   |
+|   f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-211
+|
+|---f1: Limit - scope-170
+    |
+    |---f1: New For Each(true)[bag] - scope-169
+        |   |
+        |   Project[tuple][1] - scope-168
+        |
+        |---f1: Package(Packager)[tuple]{tuple} - scope-167
+Tez vertex group scope-209	<-	 [scope-106, scope-166]	->	 null
+# No plan on vertex group
 Tez vertex scope-113
 # Plan on vertex
 b1: Split - scope-202
@@ -231,23 +248,3 @@ e1: Store(file:///tmp/output/e1:org.apac
     |   Project[bag][1] - scope-160
     |
     |---Package(LitePackager)[tuple]{int} - scope-159
-Tez vertex scope-166
-# Plan on vertex
-f1: Split - scope-203
-|   |
-|   f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-99
-|   |
-|   POValueOutputTez - scope-181	->	 [scope-178]
-|
-|---f1: Limit - scope-170
-    |
-    |---f1: New For Each(true)[bag] - scope-169
-        |   |
-        |   Project[tuple][1] - scope-168
-        |
-        |---f1: Package(Packager)[tuple]{tuple} - scope-167
-Tez vertex scope-178
-# Plan on vertex
-f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-105
-|
-|---POShuffledValueInputTez - scope-179	<-	 [scope-106, scope-166]

Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10-OPTOFF.gld?rev=1589152&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10-OPTOFF.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10-OPTOFF.gld Tue Apr 22 14:59:44 2014
@@ -0,0 +1,75 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-43
+#--------------------------------------------------
+Tez vertex scope-31	->	Tez vertex scope-33,
+Tez vertex scope-32	->	Tez vertex scope-33,
+Tez vertex scope-33	->	Tez vertex scope-38,
+Tez vertex scope-37	->	Tez vertex scope-38,
+Tez vertex scope-38	->	Tez vertex scope-42,
+Tez vertex scope-42
+
+Tez vertex scope-31
+# Plan on vertex
+POValueOutputTez - scope-35	->	 [scope-33]
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[chararray] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-32
+# Plan on vertex
+POValueOutputTez - scope-36	->	 [scope-33]
+|
+|---c: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[int] - scope-10
+    |   |
+    |   |---Project[bytearray][1] - scope-9
+    |   |
+    |   Cast[chararray] - scope-13
+    |   |
+    |   |---Project[bytearray][0] - scope-12
+    |
+    |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-33
+# Plan on vertex
+POValueOutputTez - scope-40	->	 [scope-38]
+|
+|---POShuffledValueInputTez - scope-34	<-	 [scope-31, scope-32]
+Tez vertex scope-37
+# Plan on vertex
+POValueOutputTez - scope-41	->	 [scope-38]
+|
+|---d: New For Each(false,false)[bag] - scope-24
+    |   |
+    |   Cast[int] - scope-19
+    |   |
+    |   |---Project[bytearray][0] - scope-18
+    |   |
+    |   Cast[chararray] - scope-22
+    |   |
+    |   |---Project[bytearray][1] - scope-21
+    |
+    |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17
+Tez vertex scope-38
+# Plan on vertex
+f: Local Rearrange[tuple]{int}(false) - scope-28	->	 scope-42
+|   |
+|   Project[int][0] - scope-29
+|
+|---POShuffledValueInputTez - scope-39	<-	 [scope-37, scope-33]
+Tez vertex scope-42
+# Plan on vertex
+f: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-30
+|
+|---f: Package(Packager)[tuple]{int} - scope-27

Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld?rev=1589152&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld Tue Apr 22 14:59:44 2014
@@ -0,0 +1,70 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-53
+#--------------------------------------------------
+Tez vertex scope-37	->	Tez vertex scope-43,
+Tez vertex scope-31	->	Tez vertex scope-43,
+Tez vertex scope-32	->	Tez vertex scope-43,
+Tez vertex scope-43	->	Tez vertex scope-42,
+Tez vertex scope-42
+
+Tez vertex scope-37
+# Plan on vertex
+f: Local Rearrange[tuple]{int}(false) - scope-46	->	 scope-42
+|   |
+|   Project[int][0] - scope-47
+|
+|---d: New For Each(false,false)[bag] - scope-24
+    |   |
+    |   Cast[int] - scope-19
+    |   |
+    |   |---Project[bytearray][0] - scope-18
+    |   |
+    |   Cast[chararray] - scope-22
+    |   |
+    |   |---Project[bytearray][1] - scope-21
+    |
+    |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17
+Tez vertex scope-31
+# Plan on vertex
+f: Local Rearrange[tuple]{int}(false) - scope-49	->	 scope-42
+|   |
+|   Project[int][0] - scope-50
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[chararray] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-32
+# Plan on vertex
+f: Local Rearrange[tuple]{int}(false) - scope-51	->	 scope-42
+|   |
+|   Project[int][0] - scope-52
+|
+|---c: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[int] - scope-10
+    |   |
+    |   |---Project[bytearray][1] - scope-9
+    |   |
+    |   Cast[chararray] - scope-13
+    |   |
+    |   |---Project[bytearray][0] - scope-12
+    |
+    |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex group scope-43	<-	 [scope-37, scope-31, scope-32]	->	 scope-42
+# No plan on vertex group
+Tez vertex scope-42
+# Plan on vertex
+f: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-30
+|
+|---f: Package(Packager)[tuple]{int} - scope-27

Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld Tue Apr 22 14:59:44 2014
@@ -70,20 +70,6 @@ d: Local Rearrange[tuple]{int}(false) - 
 Tez vertex group scope-55	<-	 [scope-29, scope-30]	->	 scope-35
 # No plan on vertex group
 Tez vertex scope-35
-# Combine plan on edge <scope-55>
-d: Local Rearrange[tuple]{int}(false) - scope-52	->	 scope-35
-|   |
-|   Project[int][0] - scope-54
-|
-|---e: New For Each(false,false)[bag] - scope-41
-    |   |
-    |   Project[int][0] - scope-42
-    |   |
-    |   POUserFunc(org.apache.pig.builtin.LongSum$Intermediate)[tuple] - scope-43
-    |   |
-    |   |---Project[bag][1] - scope-44
-    |
-    |---d: Package(CombinerPackager)[tuple]{int} - scope-47
 # Combine plan on edge <scope-30>
 d: Local Rearrange[tuple]{int}(false) - scope-52	->	 scope-35
 |   |

Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld Tue Apr 22 14:59:44 2014
@@ -2,114 +2,114 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-147
+# TEZ DAG plan: scope-149
 #--------------------------------------------------
-Tez vertex scope-113	->	Tez vertex scope-115,
-Tez vertex scope-114	->	Tez vertex scope-115,
-Tez vertex scope-115	->	Tez vertex scope-137,Tez vertex scope-127,
-Tez vertex scope-127	->	Tez vertex scope-137,Tez vertex scope-119,
-Tez vertex scope-137	->	Tez vertex scope-141,
-Tez vertex scope-119	->	Tez vertex scope-141,
-Tez vertex scope-141
+Tez vertex scope-115	->	Tez vertex scope-117,
+Tez vertex scope-116	->	Tez vertex scope-117,
+Tez vertex scope-117	->	Tez vertex scope-139,Tez vertex scope-129,
+Tez vertex scope-129	->	Tez vertex scope-139,Tez vertex scope-121,
+Tez vertex scope-139	->	Tez vertex scope-143,
+Tez vertex scope-121	->	Tez vertex scope-143,
+Tez vertex scope-143
 
-Tez vertex scope-113
+Tez vertex scope-115
 # Plan on vertex
-POValueOutputTez - scope-117	->	 [scope-115]
+POValueOutputTez - scope-119	->	 [scope-117]
 |
-|---a: New For Each(false,false)[bag] - scope-91
+|---a: New For Each(false,false)[bag] - scope-93
     |   |
-    |   Cast[int] - scope-86
+    |   Cast[int] - scope-88
     |   |
-    |   |---Project[bytearray][0] - scope-85
+    |   |---Project[bytearray][0] - scope-87
     |   |
-    |   Cast[chararray] - scope-89
+    |   Cast[chararray] - scope-91
     |   |
-    |   |---Project[bytearray][1] - scope-88
+    |   |---Project[bytearray][1] - scope-90
     |
-    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-84
-Tez vertex scope-114
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-86
+Tez vertex scope-116
 # Plan on vertex
-POValueOutputTez - scope-118	->	 [scope-115]
+POValueOutputTez - scope-120	->	 [scope-117]
 |
-|---c: New For Each(false,false)[bag] - scope-99
+|---c: New For Each(false,false)[bag] - scope-101
     |   |
-    |   Cast[int] - scope-94
+    |   Cast[int] - scope-96
     |   |
-    |   |---Project[bytearray][1] - scope-93
+    |   |---Project[bytearray][1] - scope-95
     |   |
-    |   Cast[chararray] - scope-97
+    |   Cast[chararray] - scope-99
     |   |
-    |   |---Project[bytearray][0] - scope-96
+    |   |---Project[bytearray][0] - scope-98
     |
-    |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-92
-Tez vertex scope-115
+    |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-94
+Tez vertex scope-117
 # Plan on vertex
-Local Rearrange[tuple]{tuple}(false) - scope-122	->	 scope-127
+Local Rearrange[tuple]{tuple}(false) - scope-124	->	 scope-129
 |   |
-|   Constant(DummyVal) - scope-121
+|   Constant(DummyVal) - scope-123
 |
-|---New For Each(true,true)[tuple] - scope-126
+|---New For Each(true,true)[tuple] - scope-128
     |   |
-    |   Project[int][0] - scope-109
+    |   Project[int][0] - scope-111
     |   |
-    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-125
+    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-127
     |   |
-    |   |---Project[tuple][*] - scope-124
+    |   |---Project[tuple][*] - scope-126
     |
-    |---PoissonSample - scope-123
+    |---PoissonSample - scope-125
         |
-        |---Local Rearrange[tuple]{int}(false) - scope-120	->	 scope-137
+        |---Local Rearrange[tuple]{int}(false) - scope-122	->	 scope-139
             |   |
-            |   Project[int][0] - scope-109
+            |   Project[int][0] - scope-111
             |
-            |---POShuffledValueInputTez - scope-116	<-	 [scope-114, scope-113]
-Tez vertex scope-127
+            |---POShuffledValueInputTez - scope-118	<-	 [scope-116, scope-115]
+Tez vertex scope-129
 # Plan on vertex
-POValueOutputTez - scope-136	->	 [scope-137, scope-119]
+POValueOutputTez - scope-138	->	 [scope-139, scope-121]
 |
-|---New For Each(false)[tuple] - scope-135
+|---New For Each(false)[tuple] - scope-137
     |   |
-    |   POUserFunc(org.apache.pig.impl.builtin.PartitionSkewedKeys)[tuple] - scope-134
+    |   POUserFunc(org.apache.pig.impl.builtin.PartitionSkewedKeys)[tuple] - scope-136
     |   |
-    |   |---Project[tuple][*] - scope-133
+    |   |---Project[tuple][*] - scope-135
     |
-    |---New For Each(false,false)[tuple] - scope-132
+    |---New For Each(false,false)[tuple] - scope-134
         |   |
-        |   Constant(1) - scope-131
+        |   Constant(1) - scope-133
         |   |
-        |   Project[bag][1] - scope-129
+        |   Project[bag][1] - scope-131
         |
-        |---Package(Packager)[tuple]{bytearray} - scope-128
-Tez vertex scope-137
+        |---Package(Packager)[tuple]{bytearray} - scope-130
+Tez vertex scope-139
 # Plan on vertex
-POIdentityInOutTez - scope-138	<-	 scope-115	->	 scope-141
+POIdentityInOutTez - scope-140	<-	 scope-117	->	 scope-143
 |   |
-|   Project[int][0] - scope-109
-Tez vertex scope-119
+|   Project[int][0] - scope-111
+Tez vertex scope-121
 # Plan on vertex
-Partition Rearrange[tuple]{int}(false) - scope-139	->	 scope-141
+Partition Rearrange[tuple]{int}(false) - scope-141	->	 scope-143
 |   |
-|   Project[int][0] - scope-110
+|   Project[int][0] - scope-112
 |
-|---d: New For Each(false,false)[bag] - scope-108
+|---d: New For Each(false,false)[bag] - scope-110
     |   |
-    |   Cast[int] - scope-103
+    |   Cast[int] - scope-105
     |   |
-    |   |---Project[bytearray][0] - scope-102
+    |   |---Project[bytearray][0] - scope-104
     |   |
-    |   Cast[chararray] - scope-106
+    |   Cast[chararray] - scope-108
     |   |
-    |   |---Project[bytearray][1] - scope-105
+    |   |---Project[bytearray][1] - scope-107
     |
-    |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-101
-Tez vertex scope-141
+    |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-103
+Tez vertex scope-143
 # Plan on vertex
-e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-112
+e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-114
 |
-|---New For Each(true,true)[tuple] - scope-145
+|---New For Each(true,true)[tuple] - scope-147
     |   |
-    |   Project[bag][1] - scope-143
+    |   Project[bag][1] - scope-145
     |   |
-    |   Project[bag][2] - scope-144
+    |   Project[bag][2] - scope-146
     |
-    |---Package(Packager)[tuple]{int} - scope-142
+    |---Package(Packager)[tuple]{int} - scope-144

Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld?rev=1589152&r1=1589151&r2=1589152&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld Tue Apr 22 14:59:44 2014
@@ -2,7 +2,7 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-83
+# TEZ DAG plan: scope-85
 #--------------------------------------------------
 Tez vertex scope-29	->	Tez vertex scope-63,Tez vertex scope-64,
 Tez vertex scope-30	->	Tez vertex scope-63,Tez vertex scope-64,
@@ -19,15 +19,15 @@ Local Rearrange[tuple]{tuple}(false) - s
 |   |
 |   Constant(DummyVal) - scope-68
 |
-|---New For Each(true,true)[tuple] - scope-73
+|---New For Each(true,true)[tuple] - scope-74
     |   |
-    |   Project[int][0] - scope-70
+    |   Project[int][0] - scope-71
     |   |
-    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-71
+    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-72
     |   |
-    |   |---Project[tuple][*] - scope-72
+    |   |---Project[tuple][*] - scope-73
     |
-    |---PoissonSample - scope-69
+    |---PoissonSample - scope-70
         |
         |---Local Rearrange[tuple]{int}(false) - scope-65	->	 scope-53
             |   |
@@ -46,23 +46,23 @@ Local Rearrange[tuple]{tuple}(false) - s
                 |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-30
 # Plan on vertex
-Local Rearrange[tuple]{tuple}(false) - scope-76	->	 scope-43
+Local Rearrange[tuple]{tuple}(false) - scope-77	->	 scope-43
 |   |
-|   Constant(DummyVal) - scope-77
+|   Constant(DummyVal) - scope-78
 |
-|---New For Each(true,true)[tuple] - scope-82
+|---New For Each(true,true)[tuple] - scope-84
     |   |
-    |   Project[int][0] - scope-79
+    |   Project[int][0] - scope-81
     |   |
-    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-80
+    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-82
     |   |
-    |   |---Project[tuple][*] - scope-81
+    |   |---Project[tuple][*] - scope-83
     |
-    |---PoissonSample - scope-78
+    |---PoissonSample - scope-80
         |
-        |---Local Rearrange[tuple]{int}(false) - scope-74	->	 scope-53
+        |---Local Rearrange[tuple]{int}(false) - scope-75	->	 scope-53
             |   |
-            |   Project[int][0] - scope-75
+            |   Project[int][0] - scope-76
             |
             |---c: New For Each(false,false)[bag] - scope-15
                 |   |