You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/01/31 01:20:34 UTC

svn commit: r1563022 - in /pig/branches/tez: src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/...

Author: rohini
Date: Fri Jan 31 00:20:34 2014
New Revision: 1563022

URL: http://svn.apache.org/r1563022
Log:
PIG-3732: Use ONE_TO_ONE edge and IdentityInOut in orderby intermediate vertex

Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDAG.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java
    pig/branches/tez/src/org/apache/pig/impl/io/NullablePartitionWritable.java
    pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Fri Jan 31 00:20:34 2014
@@ -57,7 +57,8 @@ import org.apache.pig.impl.util.Utils;
 public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>
                                       implements Configurable {
 
-    protected Map<PigNullableWritable, DiscreteProbabilitySampleGenerator> weightedParts;
+    protected Map<PigNullableWritable, DiscreteProbabilitySampleGenerator> weightedParts =
+            new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
     protected PigNullableWritable[] quantiles;
     private RawComparator<PigNullableWritable> comparator;
     private PigContext pigContext;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Fri Jan 31 00:20:34 2014
@@ -65,6 +65,8 @@ public abstract class PhysicalOperator e
     private static final Log log = LogFactory.getLog(PhysicalOperator.class);
 
     protected static final long serialVersionUID = 1L;
+    protected static final Result RESULT_EMPTY = new Result(POStatus.STATUS_NULL, null);
+    protected static final Result RESULT_EOP = new Result(POStatus.STATUS_EOP, null);
 
     // The degree of parallelism requested
     protected int requestedParallelism;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java Fri Jan 31 00:20:34 2014
@@ -45,7 +45,6 @@ import org.apache.tez.runtime.library.ou
 public class POLocalRearrangeTez extends POLocalRearrange implements TezOutput {
 
     private static final long serialVersionUID = 1L;
-    protected static Result empty = new Result(POStatus.STATUS_NULL, null);
 
     protected String outputKey;
     protected transient KeyValueWriter writer;
@@ -160,7 +159,7 @@ public class POLocalRearrangeTez extends
                 } else {
                     illustratorMarkup(res.result, res.result, 0);
                 }
-                res = empty;
+                res = RESULT_EMPTY;
                 break;
             case POStatus.STATUS_EOP:
             case POStatus.STATUS_ERR:

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java Fri Jan 31 00:20:34 2014
@@ -141,7 +141,7 @@ public class POPartitionRearrangeTez ext
                 }
             }
 
-            res = empty;
+            res = RESULT_EMPTY;
         }
         return inp;
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Fri Jan 31 00:20:34 2014
@@ -33,7 +33,6 @@ import org.apache.pig.data.AccumulativeB
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -109,11 +108,7 @@ public class POShuffleTezLoad extends PO
                         hasData = true;
                         cur = readers.get(i).getCurrentKey();
                         if (min == null || comparator.compare(min, cur) > 0) {
-                            min = PigNullableWritable.newInstance((PigNullableWritable)cur);
-                            if (isSkewedJoin) {
-                                ((NullablePartitionWritable)min).setKey(
-                                        ((NullablePartitionWritable)cur).getKey());
-                            }
+                            min = ((PigNullableWritable)cur).clone();
                         }
                     }
                 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Fri Jan 31 00:20:34 2014
@@ -151,6 +151,10 @@ public class PigProcessor implements Log
         for (POShuffleTezLoad shuffle : shuffles){
             shuffle.attachInputs(inputs, conf);
         }
+        LinkedList<POIdentityInOutTez> identityInOuts = PlanHelper.getPhysicalOperators(execPlan, POIdentityInOutTez.class);
+        for (POIdentityInOutTez identityInOut : identityInOuts){
+            identityInOut.attachInputs(inputs, conf);
+        }
         LinkedList<POFRJoinTez> broadcasts = PlanHelper.getPhysicalOperators(execPlan, POFRJoinTez.class);
         for (POFRJoinTez broadcast : broadcasts){
             broadcast.attachInputs(inputs, conf);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Fri Jan 31 00:20:34 2014
@@ -1445,8 +1445,12 @@ public class TezCompiler extends PhyPlan
      * @return Tez operator that now is finished with a store.
      * @throws PlanException
      */
-    private TezOperator endSingleInputWithStoreAndSample(POSort sort, POLocalRearrangeTez lr,
-            POLocalRearrangeTez lrSample) throws PlanException {
+    private TezOperator endSingleInputWithStoreAndSample(
+            POSort sort,
+            POLocalRearrangeTez lr,
+            POLocalRearrangeTez lrSample,
+            byte keyType,
+            Pair<POProject, Byte>[] fields) throws PlanException {
         if(compiledInputs.length>1) {
             int errCode = 2023;
             String msg = "Received a multi input plan when expecting only a single input one.";
@@ -1454,6 +1458,35 @@ public class TezCompiler extends PhyPlan
         }
         TezOperator oper = compiledInputs[0];
         if (!oper.isClosed()) {
+
+            List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+            if (fields == null) {
+                // This is project *
+                PhysicalPlan ep = new PhysicalPlan();
+                POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+                prj.setStar(true);
+                prj.setOverloaded(false);
+                prj.setResultType(DataType.TUPLE);
+                ep.add(prj);
+                eps.add(ep);
+            } else {
+                // Attach the sort plans to the local rearrange to get the
+                // projection.
+                eps.addAll(sort.getSortPlans());
+            }
+
+            try {
+                lr.setIndex(0);
+            } catch (ExecException e) {
+                int errCode = 2058;
+                String msg = "Unable to set index on newly created POLocalRearrange.";
+                throw new PlanException(msg, errCode, PigException.BUG, e);
+            }
+            lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : keyType);
+            lr.setPlans(eps);
+            lr.setResultType(DataType.TUPLE);
+            lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
+
             lr.setOutputKey(curTezOp.getOperatorKey().toString());
             oper.plan.addAsLeaf(lr);
 
@@ -1769,31 +1802,20 @@ public class TezCompiler extends PhyPlan
     }
 
     private TezOperator[] getSortJobs(
+            TezOperator inputOper,
+            POLocalRearrangeTez inputOperRearrange,
             POSort sort,
-            int rp,
             byte keyType,
             Pair<POProject, Byte>[] fields) throws PlanException{
         TezOperator[] opers = new TezOperator[2];
         TezOperator oper1 = getTezOp();
         tezPlan.add(oper1);
         opers[0] = oper1;
-        oper1.requestedParallelism = rp;
 
         long limit = sort.getLimit();
+        //TODO: TezOperator limit not used at all
         oper1.limit = limit;
 
-        List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
-
-        POPackage pkg = getPackage(1, DataType.BYTEARRAY);
-        oper1.plan.add(pkg);
-
-        POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
-        project.setResultType(DataType.BAG);
-        project.setStar(false);
-        project.setColumn(1);
-        POForEach forEach = TezCompilerUtil.getForEach(project, sort.getRequestedParallelism(), scope, nig);
-        oper1.plan.addAsLeaf(forEach);
-
         boolean[] sortOrder;
 
         List<Boolean> sortOrderList = sort.getMAscCols();
@@ -1805,42 +1827,18 @@ public class TezCompiler extends PhyPlan
             oper1.setSortOrder(sortOrder);
         }
 
-        if (fields == null) {
-            // This is project *
-            PhysicalPlan ep = new PhysicalPlan();
-            POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-            prj.setStar(true);
-            prj.setOverloaded(false);
-            prj.setResultType(DataType.TUPLE);
-            ep.add(prj);
-            eps1.add(ep);
-        } else {
-            // Attach the sort plans to the local rearrange to get the
-            // projection.
-            eps1.addAll(sort.getSortPlans());
-        }
-
-        POLocalRearrangeTez lr = new POLocalRearrangeTez(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        try {
-            lr.setIndex(0);
-        } catch (ExecException e) {
-            int errCode = 2058;
-            String msg = "Unable to set index on newly created POLocalRearrange.";
-            throw new PlanException(msg, errCode, PigException.BUG, e);
-        }
-        lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : keyType);
-        lr.setPlans(eps1);
-        lr.setResultType(DataType.TUPLE);
-        lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
-        oper1.plan.addAsLeaf(lr);
-
+        POIdentityInOutTez identityInOutTez = new POIdentityInOutTez(
+                new OperatorKey(scope, nig.getNextNodeId(scope)),
+                inputOperRearrange);
+        identityInOutTez.setInputKey(inputOper.getOperatorKey().toString());
+        oper1.plan.addAsLeaf(identityInOutTez);
         oper1.setClosed(true);
 
         TezOperator oper2 = getTezOp();
         oper2.setGlobalSort(true);
         opers[1] = oper2;
         tezPlan.add(oper2);
-        lr.setOutputKey(oper2.getOperatorKey().toString());
+        identityInOutTez.setOutputKey(oper2.getOperatorKey().toString());
 
         if (limit!=-1) {
             POPackage pkg_c = new POPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
@@ -1890,7 +1888,7 @@ public class TezCompiler extends PhyPlan
             combinePlan.addAsLeaf(lr_c2);
         }
 
-        pkg = new POPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
+        POPackage pkg = new POPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
         pkg.setPkgr(new LitePackager());
         pkg.getPkgr().setKeyType((fields == null || fields.length > 1) ? DataType.TUPLE : keyType);
         pkg.setNumInps(1);
@@ -1935,22 +1933,33 @@ public class TezCompiler extends PhyPlan
                 throw new PlanException(msg, errCode, PigException.BUG, ve);
             }
 
-            POLocalRearrangeTez lr = localRearrangeFactory.create(LocalRearrangeType.NULL);
+            POLocalRearrangeTez lr = new POLocalRearrangeTez(new OperatorKey(scope, nig.getNextNodeId(scope)));
             POLocalRearrangeTez lrSample = localRearrangeFactory.create(LocalRearrangeType.NULL);
 
-            TezOperator prevOper = endSingleInputWithStoreAndSample(op, lr, lrSample);
+            TezOperator prevOper = endSingleInputWithStoreAndSample(op, lr, lrSample, keyType, fields);
 
             int rp = Math.max(op.getRequestedParallelism(), 1);
 
             Pair<TezOperator, Integer> quantJobParallelismPair = getQuantileJobs(op, rp);
-            TezOperator[] sortOpers = getSortJobs(op, quantJobParallelismPair.second, keyType, fields);
+            TezOperator[] sortOpers = getSortJobs(prevOper, lr, op, keyType, fields);
 
             TezEdgeDescriptor edge = handleSplitAndConnect(tezPlan, prevOper, sortOpers[0]);
 
             // TODO: Convert to unsorted shuffle after TEZ-661
             // edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
             // edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
-            edge.partitionerClass = RoundRobinPartitioner.class;
+            // edge.partitionerClass = RoundRobinPartitioner.class;
+
+            // TODO: Test which is better - ONE_TO_ONE from prevOper to same number of
+            // tasks in sortOpers[0] and then sortOpers[1] with requestedParallelism
+            // or unsorted shuffled output (TEZ-661) from prevOper to
+            // sortOpers[0] with requestedParallelism and then sortOpers[1] with
+            // requestedParallelism
+            sortOpers[0].requestedParallelism = prevOper.requestedParallelism;
+            sortOpers[1].requestedParallelism = quantJobParallelismPair.second;
+            edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+            edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+            edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
 
             handleSplitAndConnect(tezPlan, prevOper, quantJobParallelismPair.first, false);
             lr.setOutputKey(sortOpers[0].getOperatorKey().toString());

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDAG.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDAG.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDAG.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDAG.java Fri Jan 31 00:20:34 2014
@@ -27,6 +27,7 @@ public class TezDAG extends DAG {
     public TezDAG(String name) {
         super(name);
         this.credentials = new Credentials();
+        super.setCredentials(credentials);
     }
 
     public Credentials getCredentials() {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java Fri Jan 31 00:20:34 2014
@@ -17,7 +17,6 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -64,7 +63,6 @@ public class WeightedRangePartitionerTez
 
         long start = System.currentTimeMillis();
         try {
-            weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
             DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
             InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
             convertToArray(quantilesList);

Modified: pig/branches/tez/src/org/apache/pig/impl/io/NullablePartitionWritable.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/io/NullablePartitionWritable.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/io/NullablePartitionWritable.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/io/NullablePartitionWritable.java Fri Jan 31 00:20:34 2014
@@ -55,7 +55,15 @@ public class NullablePartitionWritable e
 		return partitionIndex;
 	}
 
-  	@Override
+    @Override
+    public NullablePartitionWritable clone() throws CloneNotSupportedException {
+        NullablePartitionWritable clone = new NullablePartitionWritable();
+        clone.setKey(this.getKey());
+        clone.partitionIndex = this.partitionIndex;
+        return clone;
+    }
+
+    @Override
     public int compareTo(Object o) {
 		return key.compareTo(((NullablePartitionWritable)o).getKey());
 	}

Modified: pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java Fri Jan 31 00:20:34 2014
@@ -40,7 +40,7 @@ import org.apache.pig.data.Tuple;
 //Put in to make the compiler not complain about WritableComparable
 //being a generic type.
 @SuppressWarnings("unchecked")
-public abstract class PigNullableWritable implements WritableComparable {
+public abstract class PigNullableWritable implements WritableComparable, Cloneable {
 
     /**
      * indices in multiquery optimized maps
@@ -61,12 +61,17 @@ public abstract class PigNullableWritabl
 
     private byte mIndex;
 
-    public static PigNullableWritable newInstance(PigNullableWritable copy) throws Exception {
-        PigNullableWritable instance = copy.getClass().newInstance();
-        instance.mNull = copy.mNull;
-        instance.mValue = copy.mValue;
-        instance.mIndex = copy.mIndex;
-        return instance;
+    @Override
+    public PigNullableWritable clone() throws CloneNotSupportedException {
+        try {
+            PigNullableWritable clone = this.getClass().newInstance();
+            clone.mNull = this.mNull;
+            clone.mValue = this.mValue;
+            clone.mIndex = this.mIndex;
+            return clone;
+        } catch (Exception e) {
+            throw new RuntimeException("Exception while cloning " + this, e);
+        }
     }
 
     /**

Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld Fri Jan 31 00:20:34 2014
@@ -2,28 +2,28 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-40
+# TEZ DAG plan: scope-36
 #--------------------------------------------------
-Tez vertex scope-11	->	Tez vertex scope-30,Tez vertex scope-19,
-Tez vertex scope-19	->	Tez vertex scope-30,
-Tez vertex scope-30	->	Tez vertex scope-35,
-Tez vertex scope-35
+Tez vertex scope-11	->	Tez vertex scope-29,Tez vertex scope-18,
+Tez vertex scope-18	->	Tez vertex scope-29,
+Tez vertex scope-29	->	Tez vertex scope-31,
+Tez vertex scope-31
 
 Tez vertex scope-11
 # Plan on vertex
-Local Rearrange[tuple]{tuple}(false) - scope-15	->	 scope-19
+Local Rearrange[tuple]{tuple}(false) - scope-14	->	 scope-18
 |   |
-|   Constant(DummyVal) - scope-14
+|   Constant(DummyVal) - scope-13
 |
-|---ReservoirSample - scope-18
+|---ReservoirSample - scope-17
     |
-    |---New For Each(false)[tuple] - scope-17
+    |---New For Each(false)[tuple] - scope-16
         |   |
-        |   Project[int][0] - scope-16
+        |   Project[int][0] - scope-15
         |
-        |---Local Rearrange[tuple]{bytearray}(false) - scope-13	->	 scope-30
+        |---b: Local Rearrange[tuple]{int}(false) - scope-12	->	 scope-29
             |   |
-            |   Constant(DummyVal) - scope-12
+            |   Project[int][0] - scope-8
             |
             |---a: New For Each(false,false)[bag] - scope-7
                 |   |
@@ -36,42 +36,36 @@ Local Rearrange[tuple]{tuple}(false) - s
                 |   |---Project[bytearray][1] - scope-4
                 |
                 |---a: Load(file:///tmp/input:PigStorage(',')) - scope-0
-Tez vertex scope-19
+Tez vertex scope-18
 # Plan on vertex
-Local Rearrange[tuple]{bytearray}(false) - scope-29	->	 scope-30
+Local Rearrange[tuple]{bytearray}(false) - scope-28	->	 scope-29
 |   |
-|   Constant(DummyVal) - scope-28
+|   Constant(DummyVal) - scope-27
 |
-|---New For Each(false)[tuple] - scope-27
+|---New For Each(false)[tuple] - scope-26
     |   |
-    |   POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-26
+    |   POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-25
     |   |
-    |   |---Project[tuple][*] - scope-25
+    |   |---Project[tuple][*] - scope-24
     |
-    |---New For Each(false,false)[tuple] - scope-24
+    |---New For Each(false,false)[tuple] - scope-23
         |   |
-        |   Constant(1) - scope-23
+        |   Constant(1) - scope-22
         |   |
-        |   Project[bag][1] - scope-21
+        |   Project[bag][1] - scope-20
         |
-        |---Package(Packager)[tuple]{bytearray} - scope-20
-Tez vertex scope-30
+        |---Package(Packager)[tuple]{bytearray} - scope-19
+Tez vertex scope-29
 # Plan on vertex
-b: Local Rearrange[tuple]{int}(false) - scope-34	->	 scope-35
+POIdentityInOutTez - scope-30	->	 scope-31
 |   |
 |   Project[int][0] - scope-8
-|
-|---New For Each(true)[bag] - scope-33
-    |   |
-    |   Project[bag][1] - scope-32
-    |
-    |---Package(Packager)[tuple]{bytearray} - scope-31
-Tez vertex scope-35
+Tez vertex scope-31
 # Plan on vertex
 b: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-10
 |
-|---New For Each(true)[tuple] - scope-38
+|---New For Each(true)[tuple] - scope-34
     |   |
-    |   Project[bag][1] - scope-37
+    |   Project[bag][1] - scope-33
     |
-    |---Package(LitePackager)[tuple]{int} - scope-36
\ No newline at end of file
+    |---Package(LitePackager)[tuple]{int} - scope-32
\ No newline at end of file

Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld?rev=1563022&r1=1563021&r2=1563022&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld Fri Jan 31 00:20:34 2014
@@ -2,16 +2,16 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-200
+# TEZ DAG plan: scope-196
 #--------------------------------------------------
-Tez vertex scope-106	->	Tez vertex scope-109,Tez vertex scope-122,Tez vertex scope-122,Tez vertex scope-126,Tez vertex scope-152,Tez vertex scope-141,Tez vertex scope-178,Tez vertex scope-178,
+Tez vertex scope-106	->	Tez vertex scope-109,Tez vertex scope-122,Tez vertex scope-122,Tez vertex scope-126,Tez vertex scope-151,Tez vertex scope-140,Tez vertex scope-174,Tez vertex scope-174,
 Tez vertex scope-122
+Tez vertex scope-174
+Tez vertex scope-140	->	Tez vertex scope-151,
+Tez vertex scope-151	->	Tez vertex scope-153,
+Tez vertex scope-153
 Tez vertex scope-126
-Tez vertex scope-141	->	Tez vertex scope-152,
-Tez vertex scope-152	->	Tez vertex scope-157,
-Tez vertex scope-157
 Tez vertex scope-109
-Tez vertex scope-178
 
 Tez vertex scope-106
 # Plan on vertex
@@ -29,19 +29,19 @@ Tez vertex scope-106
 |   |   |
 |   |   1-2: Split - scope-83
 |   |   |   |
-|   |   |   Local Rearrange[tuple]{tuple}(false) - scope-137	->	 scope-141
+|   |   |   Local Rearrange[tuple]{tuple}(false) - scope-136	->	 scope-140
 |   |   |   |   |
-|   |   |   |   Constant(DummyVal) - scope-136
+|   |   |   |   Constant(DummyVal) - scope-135
 |   |   |   |
-|   |   |   |---ReservoirSample - scope-140
+|   |   |   |---ReservoirSample - scope-139
 |   |   |       |
-|   |   |       |---New For Each(false)[tuple] - scope-139
+|   |   |       |---New For Each(false)[tuple] - scope-138
 |   |   |           |   |
-|   |   |           |   Project[int][0] - scope-138
+|   |   |           |   Project[int][0] - scope-137
 |   |   |           |
-|   |   |           |---Local Rearrange[tuple]{bytearray}(false) - scope-135	->	 scope-152
+|   |   |           |---e1: Local Rearrange[tuple]{int}(false) - scope-134	->	 scope-151
 |   |   |               |   |
-|   |   |               |   Constant(DummyVal) - scope-134
+|   |   |               |   Project[int][0] - scope-88
 |   |   |               |
 |   |   |               |---e: Filter[bag] - scope-84
 |   |   |                   |   |
@@ -55,9 +55,9 @@ Tez vertex scope-106
 |   |   |   |   |
 |   |   |   |   f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-99
 |   |   |   |   |
-|   |   |   |   f2: Local Rearrange[tuple]{tuple}(false) - scope-177	->	 scope-178
+|   |   |   |   f2: Local Rearrange[tuple]{tuple}(false) - scope-173	->	 scope-174
 |   |   |   |   |   |
-|   |   |   |   |   Project[tuple][*] - scope-176
+|   |   |   |   |   Project[tuple][*] - scope-172
 |   |   |   |
 |   |   |   |---f1: Limit - scope-95
 |   |   |       |
@@ -83,21 +83,21 @@ Tez vertex scope-106
 |   |   |   |
 |   |   |   Project[int][0] - scope-48
 |   |   |
-|   |   c2: Local Rearrange[tuple]{int}(false) - scope-192	->	 scope-126
+|   |   c2: Local Rearrange[tuple]{int}(false) - scope-188	->	 scope-126
 |   |   |   |
-|   |   |   Project[int][0] - scope-194
+|   |   |   Project[int][0] - scope-190
 |   |   |
-|   |   |---c3: New For Each(false,false)[bag] - scope-180
+|   |   |---c3: New For Each(false,false)[bag] - scope-176
 |   |       |   |
-|   |       |   Project[int][0] - scope-181
+|   |       |   Project[int][0] - scope-177
 |   |       |   |
-|   |       |   POUserFunc(org.apache.pig.builtin.AlgebraicMathBase$Initial)[tuple] - scope-182
+|   |       |   POUserFunc(org.apache.pig.builtin.AlgebraicMathBase$Initial)[tuple] - scope-178
 |   |       |   |
-|   |       |   |---Project[bag][0] - scope-183
+|   |       |   |---Project[bag][0] - scope-179
 |   |       |       |
-|   |       |       |---Project[bag][1] - scope-184
+|   |       |       |---Project[bag][1] - scope-180
 |   |       |
-|   |       |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-195
+|   |       |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-191
 |   |
 |   |---c: Filter[bag] - scope-34
 |       |   |
@@ -111,9 +111,9 @@ Tez vertex scope-106
 |   |   |
 |   |   d1: Store(file:///tmp/output/d1:org.apache.pig.builtin.PigStorage) - scope-80
 |   |   |
-|   |   f2: Local Rearrange[tuple]{tuple}(false) - scope-175	->	 scope-178
+|   |   f2: Local Rearrange[tuple]{tuple}(false) - scope-171	->	 scope-174
 |   |   |   |
-|   |   |   Project[tuple][*] - scope-174
+|   |   |   Project[tuple][*] - scope-170
 |   |
 |   |---d1: Filter[bag] - scope-73
 |       |   |
@@ -153,72 +153,71 @@ c1: Store(file:///tmp/output/c1:org.apac
     |   Project[bag][2] - scope-52
     |
     |---c1: Package(Packager)[tuple]{int} - scope-46
-Tez vertex scope-126
-# Combine plan on edge <scope-106>
-c2: Local Rearrange[tuple]{int}(false) - scope-196	->	 scope-126
-|   |
-|   Project[int][0] - scope-198
-|
-|---c3: New For Each(false,false)[bag] - scope-185
-    |   |
-    |   Project[int][0] - scope-186
-    |   |
-    |   POUserFunc(org.apache.pig.builtin.LongSum$Intermediate)[tuple] - scope-187
-    |   |
-    |   |---Project[bag][1] - scope-188
-    |
-    |---c2: Package(CombinerPackager)[tuple]{int} - scope-191
+Tez vertex scope-174
 # Plan on vertex
-c3: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-68
+f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-105
 |
-|---c3: New For Each(false,false)[bag] - scope-67
-    |   |
-    |   Project[int][0] - scope-61
-    |   |
-    |   POUserFunc(org.apache.pig.builtin.LongSum$Final)[long] - scope-65
-    |   |
-    |   |---Project[bag][1] - scope-189
-    |
-    |---c2: Package(CombinerPackager)[tuple]{int} - scope-58
-Tez vertex scope-141
+|---f2: Package(Packager)[tuple]{tuple} - scope-175
+Tez vertex scope-140
 # Plan on vertex
-Local Rearrange[tuple]{bytearray}(false) - scope-151	->	 scope-152
+Local Rearrange[tuple]{bytearray}(false) - scope-150	->	 scope-151
 |   |
-|   Constant(DummyVal) - scope-150
+|   Constant(DummyVal) - scope-149
 |
-|---New For Each(false)[tuple] - scope-149
+|---New For Each(false)[tuple] - scope-148
     |   |
-    |   POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-148
+    |   POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-147
     |   |
-    |   |---Project[tuple][*] - scope-147
+    |   |---Project[tuple][*] - scope-146
     |
-    |---New For Each(false,false)[tuple] - scope-146
+    |---New For Each(false,false)[tuple] - scope-145
         |   |
-        |   Constant(1) - scope-145
+        |   Constant(1) - scope-144
         |   |
-        |   Project[bag][1] - scope-143
+        |   Project[bag][1] - scope-142
         |
-        |---Package(Packager)[tuple]{bytearray} - scope-142
-Tez vertex scope-152
+        |---Package(Packager)[tuple]{bytearray} - scope-141
+Tez vertex scope-151
 # Plan on vertex
-e1: Local Rearrange[tuple]{int}(false) - scope-156	->	 scope-157
+POIdentityInOutTez - scope-152	->	 scope-153
 |   |
 |   Project[int][0] - scope-88
+Tez vertex scope-153
+# Plan on vertex
+e1: Store(file:///tmp/output/e1:org.apache.pig.builtin.PigStorage) - scope-90
 |
-|---New For Each(true)[bag] - scope-155
+|---New For Each(true)[tuple] - scope-156
     |   |
-    |   Project[bag][1] - scope-154
+    |   Project[bag][1] - scope-155
     |
-    |---Package(Packager)[tuple]{bytearray} - scope-153
-Tez vertex scope-157
+    |---Package(LitePackager)[tuple]{int} - scope-154
+Tez vertex scope-126
+# Combine plan on edge <scope-106>
+c2: Local Rearrange[tuple]{int}(false) - scope-192	->	 scope-126
+|   |
+|   Project[int][0] - scope-194
+|
+|---c3: New For Each(false,false)[bag] - scope-181
+    |   |
+    |   Project[int][0] - scope-182
+    |   |
+    |   POUserFunc(org.apache.pig.builtin.LongSum$Intermediate)[tuple] - scope-183
+    |   |
+    |   |---Project[bag][1] - scope-184
+    |
+    |---c2: Package(CombinerPackager)[tuple]{int} - scope-187
 # Plan on vertex
-e1: Store(file:///tmp/output/e1:org.apache.pig.builtin.PigStorage) - scope-90
+c3: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-68
 |
-|---New For Each(true)[tuple] - scope-160
+|---c3: New For Each(false,false)[bag] - scope-67
+    |   |
+    |   Project[int][0] - scope-61
+    |   |
+    |   POUserFunc(org.apache.pig.builtin.LongSum$Final)[long] - scope-65
     |   |
-    |   Project[bag][1] - scope-159
+    |   |---Project[bag][1] - scope-185
     |
-    |---Package(LitePackager)[tuple]{int} - scope-158
+    |---c2: Package(CombinerPackager)[tuple]{int} - scope-58
 Tez vertex scope-109
 # Plan on vertex
 b1: Split - scope-20
@@ -237,9 +236,4 @@ b1: Split - scope-20
 |       |       |
 |       |       |---Project[bag][1] - scope-28
 |
-|---b1: Package(Packager)[tuple]{int} - scope-17
-Tez vertex scope-178
-# Plan on vertex
-f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-105
-|
-|---f2: Package(Packager)[tuple]{tuple} - scope-179
\ No newline at end of file
+|---b1: Package(Packager)[tuple]{int} - scope-17
\ No newline at end of file