You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/04/01 21:49:10 UTC

svn commit: r1583768 [1/2] - in /pig/branches/tez: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executi...

Author: rohini
Date: Tue Apr  1 19:49:09 2014
New Revision: 1583768

URL: http://svn.apache.org/r1583768
Log:
PIG-3835: Improve performance of union (rohini)

Added:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezInput.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POShuffledValueInputTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2-OPTOFF.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld
Removed:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLoad.java
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC19.gld
Modified:
    pig/branches/tez/src/org/apache/pig/PigConfiguration.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/RoundRobinPartitioner.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOutput.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC21.gld
    pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/branches/tez/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigConfiguration.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigConfiguration.java Tue Apr  1 19:49:09 2014
@@ -81,6 +81,11 @@ public class PigConfiguration {
     public static final String OPT_ACCUMULATOR = "opt.accumulator";
 
     /**
+     * This key is used to enable union optimization.
+     */
+    public static final String TEZ_OPT_UNION = "pig.tez.opt.union";
+
+    /**
      * This key is used to define whether to reuse AM in Tez jobs.
      */
     public static final String TEZ_SESSION_REUSE = "pig.tez.session.reuse";
@@ -186,7 +191,7 @@ public class PigConfiguration {
      * of nested distinct or sort
      */
     public static final String PIG_EXEC_NO_SECONDARY_KEY = "pig.exec.nosecondarykey";
- 
+
     /**
      * This key used to control the sample size of RandomeSampleLoader for
      * order-by. The default value is 100 rows per task.

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java Tue Apr  1 19:49:09 2014
@@ -34,13 +34,13 @@ import org.apache.pig.pen.util.ExampleTu
 
 public class POLimit extends PhysicalOperator {
 	   /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
 
     // Counts for outputs processed
     private long soFar = 0;
-    
+
     // Number of limited outputs
     long mLimit;
 
@@ -62,11 +62,11 @@ public class POLimit extends PhysicalOpe
     public POLimit(OperatorKey k, int rp, List<PhysicalOperator> inputs) {
         super(k, rp, inputs);
     }
-    
+
     public void setLimit(long limit) {
     	mLimit = limit;
     }
-    
+
     public long getLimit() {
     	return mLimit;
     }
@@ -80,8 +80,8 @@ public class POLimit extends PhysicalOpe
     }
 
     /**
-     * Counts the number of tuples processed into static variable soFar, if the number of tuples processed reach the 
-     * limit, return EOP; Otherwise, return the tuple 
+     * Counts the number of tuples processed into static variable soFar, if the number of tuples processed reach the
+     * limit, return EOP; Otherwise, return the tuple
      */
     @Override
     public Result getNextTuple() throws ExecException {
@@ -117,12 +117,12 @@ public class POLimit extends PhysicalOpe
             inp = processInput();
             if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
                 break;
-            
+
             illustratorMarkup(inp.result, null, 0);
             // illustrator ignore LIMIT before the post processing
             if ((illustrator == null || illustrator.getOriginalLimit() != -1) && soFar>=mLimit)
             	inp.returnStatus = POStatus.STATUS_EOP;
-            
+
             soFar++;
             break;
         }
@@ -161,11 +161,13 @@ public class POLimit extends PhysicalOpe
             NodeIdGenerator.getGenerator().getNextNodeId(this.mKey.scope)),
             this.requestedParallelism, this.inputs);
         newLimit.mLimit = this.mLimit;
-        newLimit.expressionPlan = this.expressionPlan.clone();
+        if (this.expressionPlan != null) {
+            newLimit.expressionPlan = this.expressionPlan.clone();
+        }
         newLimit.addOriginalLocation(alias, getOriginalLocations());
         return newLimit;
     }
-    
+
     @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         if(illustrator != null) {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Tue Apr  1 19:49:09 2014
@@ -24,7 +24,7 @@ public class Packager implements Seriali
     protected DataBag[] bags;
 
     public static enum PackageType {
-        GROUP, JOIN, UNION
+        GROUP, JOIN
     };
 
     // The key being worked on
@@ -273,6 +273,7 @@ public class Packager implements Seriali
         this.numInputs = numInputs;
     }
 
+    @Override
     public Packager clone() throws CloneNotSupportedException {
         Packager clone = (Packager) super.clone();
         clone.setNumInputs(numInputs);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java Tue Apr  1 19:49:09 2014
@@ -18,13 +18,11 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -155,17 +153,20 @@ public class MultiQueryOptimizerTez exte
                 plan.disconnect(splittee, succTezOperator);
                 TezCompilerUtil.connect(plan, splitter, succTezOperator, edge);
 
-                for (TezOperator succ : succs) {
-                    try {
-                        List<POFRJoinTez> frJoins = PlanHelper.getPhysicalOperators(succ.plan, POFRJoinTez.class);
-                        for (POFRJoinTez frJoin : frJoins) {
-                            if (frJoin.getInputKeys().contains(splittee.getOperatorKey().toString())) {
-                                frJoin.getInputKeys().set(frJoin.getInputKeys().indexOf(splittee.getOperatorKey().toString()),
-                                        splitter.getOperatorKey().toString());
-                            }
-                        }
-                    } catch (VisitorException e) {
-                        throw new PlanException(e);
+                try {
+                    List<TezInput> inputs = PlanHelper.getPhysicalOperators(succTezOperator.plan, TezInput.class);
+                    for (TezInput input : inputs) {
+                        input.replaceInput(splittee.getOperatorKey().toString(),
+                                splitter.getOperatorKey().toString());
+                    }
+                } catch (VisitorException e) {
+                    throw new PlanException(e);
+                }
+
+                if (succTezOperator.isUnion()) {
+                    int index = succTezOperator.getUnionPredecessors().indexOf(splittee.getOperatorKey());
+                    if (index > -1) {
+                        succTezOperator.getUnionPredecessors().set(index, splitter.getOperatorKey());
                     }
                 }
             }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java Tue Apr  1 19:49:09 2014
@@ -47,7 +47,7 @@ import com.google.common.collect.Lists;
  * POFRJoinTez is used on the backend to load replicated table from Tez
  * ShuffleUnorderedKVInput and load fragmented table from data pipeline.
  */
-public class POFRJoinTez extends POFRJoin implements TezLoad {
+public class POFRJoinTez extends POFRJoin implements TezInput {
 
     private static final Log log = LogFactory.getLog(POFRJoinTez.class);
     private static final long serialVersionUID = 1L;
@@ -64,6 +64,18 @@ public class POFRJoinTez extends POFRJoi
     }
 
     @Override
+    public String[] getTezInputs() {
+        return inputKeys.toArray(new String[inputKeys.size()]);
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        if (inputKeys.remove(oldInputKey)) {
+            inputKeys.add(newInputKey);
+        }
+    }
+
+    @Override
     public void addInputsToSkip(Set<String> inputsToSkip) {
         String cacheKey = "replicatemap-" + getOperatorKey().toString();
         Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
@@ -73,7 +85,6 @@ public class POFRJoinTez extends POFRJoi
         }
     }
 
-    @SuppressWarnings("rawtypes")
     @Override
     public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf)
             throws ExecException {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java Tue Apr  1 19:49:09 2014
@@ -32,10 +32,10 @@ import org.apache.pig.impl.io.PigNullabl
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
 
 /**
  * POIdentityInOutTez is used to pass through tuples as is to next vertex from
@@ -44,7 +44,7 @@ import org.apache.tez.runtime.library.in
  * previous vertex data uses POIdentityInOutTez.
  */
 @InterfaceAudience.Private
-public class POIdentityInOutTez extends POLocalRearrangeTez implements TezLoad, TezOutput {
+public class POIdentityInOutTez extends POLocalRearrangeTez implements TezInput, TezOutput {
 
     private static final long serialVersionUID = 1L;
     private String inputKey;
@@ -62,6 +62,18 @@ public class POIdentityInOutTez extends 
     }
 
     @Override
+    public String[] getTezInputs() {
+        return new String[] { inputKey };
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        if (oldInputKey.equals(inputKey)) {
+            inputKey = newInputKey;
+        }
+    }
+
+    @Override
     public void addInputsToSkip(Set<String> inputsToSkip) {
     }
 
@@ -73,12 +85,12 @@ public class POIdentityInOutTez extends 
             throw new ExecException("Input from vertex " + inputKey + " is missing");
         }
         try {
-            if (input instanceof ShuffledMergedInput) {
-                shuffleInput = true;
-                ShuffledMergedInput smInput = (ShuffledMergedInput) input;
-                shuffleReader = smInput.getReader();
+            Reader r = input.getReader();
+            if (r instanceof KeyValueReader) {
+                reader = (KeyValueReader) r;
             } else {
-                reader = (KeyValueReader) input.getReader();
+                shuffleInput = true;
+                shuffleReader = (KeyValuesReader) r;
             }
         } catch (Exception e) {
             throw new ExecException(e);
@@ -141,7 +153,7 @@ public class POIdentityInOutTez extends 
 
     @Override
     public String name() {
-        return "POIdentityInOutTez - " + mKey.toString() + "\t->\t " + outputKey;
+        return "POIdentityInOutTez - " + mKey.toString() + "\t<-\t " + inputKey + "\t->\t " + outputKey;
     }
 
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java Tue Apr  1 19:49:09 2014
@@ -48,8 +48,6 @@ public class POLocalRearrangeTez extends
     protected String outputKey;
     protected transient KeyValueWriter writer;
 
-    // Tez union is implemented as LR + Pkg
-    protected boolean isUnion = false;
     protected boolean isSkewedJoin = false;
 
     public POLocalRearrangeTez(OperatorKey k) {
@@ -64,7 +62,6 @@ public class POLocalRearrangeTez extends
         super(copy);
         if (copy instanceof POLocalRearrangeTez) {
             POLocalRearrangeTez copyTez = (POLocalRearrangeTez) copy;
-            this.isUnion = copyTez.isUnion;
             this.isSkewedJoin = copyTez.isSkewedJoin;
             this.outputKey = copyTez.outputKey;
         }
@@ -78,14 +75,6 @@ public class POLocalRearrangeTez extends
         this.outputKey = outputKey;
     }
 
-    public boolean isUnion() {
-        return isUnion;
-    }
-
-    public void setUnion(boolean isUnion) {
-        this.isUnion = isUnion;
-    }
-
     public boolean isSkewedJoin() {
         return isSkewedJoin;
     }
@@ -95,6 +84,18 @@ public class POLocalRearrangeTez extends
     }
 
     @Override
+    public String[] getTezOutputs() {
+        return new String[] { outputKey };
+    }
+
+    @Override
+    public void replaceOutput(String oldOutputKey, String newOutputKey) {
+        if (oldOutputKey.equals(outputKey)) {
+            outputKey = newOutputKey;
+        }
+    }
+
+    @Override
     public void attachOutputs(Map<String, LogicalOutput> outputs,
             Configuration conf) throws ExecException {
         LogicalOutput logicalOut = outputs.get(outputKey);
@@ -136,18 +137,9 @@ public class POLocalRearrangeTez extends
             case POStatus.STATUS_OK:
                 if (illustrator == null) {
                     Tuple result = (Tuple) res.result;
-                    Byte index = (Byte)result.get(0);
-                    PigNullableWritable key = null;
-                    NullableTuple val = null;
-                    if (isUnion) {
-                        // Use the whole tuple as key and set value to null
-                        key = HDataType.getWritableComparableTypes(result.get(1), keyType);
-                        val = new NullableTuple();
-                        val.setNull(true);
-                    } else {
-                        key = HDataType.getWritableComparableTypes(result.get(1), keyType);
-                        val = new NullableTuple((Tuple)result.get(2));
-                    }
+                    Byte index = (Byte) result.get(0);
+                    PigNullableWritable key = HDataType.getWritableComparableTypes(result.get(1), keyType);
+                    NullableTuple val = new NullableTuple((Tuple)result.get(2));
 
                     // Both the key and the value need the index.  The key needs it so
                     // that it can be sorted on the index in addition to the key

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Tue Apr  1 19:49:09 2014
@@ -39,14 +39,13 @@ import org.apache.pig.impl.io.PigNullabl
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
 
-public class POShuffleTezLoad extends POPackage implements TezLoad {
+public class POShuffleTezLoad extends POPackage implements TezInput {
 
     private static final long serialVersionUID = 1L;
 
     protected List<String> inputKeys = new ArrayList<String>();
-    protected List<ShuffledMergedInput> inputs = new ArrayList<ShuffledMergedInput>();
+    protected List<LogicalInput> inputs = new ArrayList<LogicalInput>();
     protected List<KeyValuesReader> readers = new ArrayList<KeyValuesReader>();
 
     private boolean[] finished;
@@ -60,6 +59,18 @@ public class POShuffleTezLoad extends PO
     }
 
     @Override
+    public String[] getTezInputs() {
+        return inputKeys.toArray(new String[inputKeys.size()]);
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        if (inputKeys.remove(oldInputKey)) {
+            inputKeys.add(newInputKey);
+        }
+    }
+
+    @Override
     public void addInputsToSkip(Set<String> inputsToSkip) {
     }
 
@@ -71,11 +82,8 @@ public class POShuffleTezLoad extends PO
         try {
             for (String key : inputKeys) {
                 LogicalInput input = inputs.get(key);
-                if (input instanceof ShuffledMergedInput) {
-                    ShuffledMergedInput smInput = (ShuffledMergedInput) input;
-                    this.inputs.add(smInput);
-                    this.readers.add(smInput.getReader());
-                }
+                this.inputs.add(input);
+                this.readers.add((KeyValuesReader)input.getReader());
             }
 
             // We need to adjust numInputs because it's possible for both
@@ -93,7 +101,7 @@ public class POShuffleTezLoad extends PO
             for (int i = 0; i < numInputs; i++) {
                 finished[i] = !readers.get(i).next();
             }
-        } catch (IOException e) {
+        } catch (Exception e) {
             throw new ExecException(e);
         }
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java Tue Apr  1 19:49:09 2014
@@ -36,7 +36,7 @@ import org.apache.tez.runtime.library.ap
 /**
  * POSimpleTezLoad is used on the backend to read tuples from a Tez MRInput
  */
-public class POSimpleTezLoad extends POLoad implements TezLoad {
+public class POSimpleTezLoad extends POLoad implements TezInput {
 
     private static final long serialVersionUID = 1L;
     private String inputKey;
@@ -48,6 +48,18 @@ public class POSimpleTezLoad extends POL
     }
 
     @Override
+    public String[] getTezInputs() {
+        return new String[] { inputKey };
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        if (oldInputKey.equals(inputKey)) {
+            inputKey = newInputKey;
+        }
+    }
+
+    @Override
     public void addInputsToSkip(Set<String> inputsToSkip) {
     }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java Tue Apr  1 19:49:09 2014
@@ -39,20 +39,40 @@ public class POStoreTez extends POStore 
     private static final long serialVersionUID = 1L;
     private transient MROutput output;
     private transient KeyValueWriter writer;
+    private String outputKey;
 
     public POStoreTez(OperatorKey k) {
         super(k);
+        this.outputKey = k.toString();
     }
 
     public POStoreTez(POStore copy) {
         super(copy);
+        this.outputKey = copy.getOperatorKey().toString();
+    }
+
+    public String getOutputKey() {
+        return outputKey;
+    }
+
+    public void setOutputKey(String outputKey) {
+        this.outputKey = outputKey;
+    }
+
+    @Override
+    public String[] getTezOutputs() {
+        return new String[] { outputKey };
+    }
+
+    @Override
+    public void replaceOutput(String oldOutputKey, String newOutputKey) {
     }
 
     @Override
     public void attachOutputs(Map<String, LogicalOutput> outputs,
             Configuration conf)
             throws ExecException {
-        LogicalOutput logicalOut = outputs.get(getOperatorKey().toString());
+        LogicalOutput logicalOut = outputs.get(outputKey);
         if (logicalOut == null || !(logicalOut instanceof MROutput)) {
             throw new ExecException("POStoreTez only accepts MROutput. key =  "
                     + getOperatorKey() + ", outputs = " + outputs);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java Tue Apr  1 19:49:09 2014
@@ -41,7 +41,7 @@ import org.apache.tez.runtime.library.ap
  * POValueInputTez is used read tuples from a Tez Intermediate output from a 1-1
  * edge
  */
-public class POValueInputTez extends PhysicalOperator implements TezLoad {
+public class POValueInputTez extends PhysicalOperator implements TezInput {
 
     private static final long serialVersionUID = 1L;
     private static final Log LOG = LogFactory.getLog(POValueInputTez.class);
@@ -57,6 +57,18 @@ public class POValueInputTez extends Phy
     }
 
     @Override
+    public String[] getTezInputs() {
+        return new String[] { inputKey };
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        if (oldInputKey.equals(inputKey)) {
+            inputKey = newInputKey;
+        }
+    }
+
+    @Override
     public void addInputsToSkip(Set<String> inputsToSkip) {
     }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java Tue Apr  1 19:49:09 2014
@@ -29,6 +29,7 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Writable;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -59,6 +60,18 @@ public class POValueOutputTez extends Ph
     }
 
     @Override
+    public String[] getTezOutputs() {
+        return outputKeys.toArray(new String[outputKeys.size()]);
+    }
+
+    @Override
+    public void replaceOutput(String oldOutputKey, String newOutputKey) {
+        if (outputKeys.remove(oldOutputKey)) {
+            outputKeys.add(oldOutputKey);
+        }
+    }
+
+    @Override
     public void attachOutputs(Map<String, LogicalOutput> outputs,
             Configuration conf) throws ExecException {
         writers = new ArrayList<KeyValueWriter>();
@@ -147,4 +160,24 @@ public class POValueOutputTez extends Ph
         }
     }
 
+    //TODO: Remove after PIG-3775/TEZ-661
+    public static class EmptyWritableComparator implements RawComparator<EmptyWritable> {
+
+        @Override
+        public int compare(EmptyWritable o1, EmptyWritable o2) {
+            return 0;
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            // 0 - Reverses the input order. 0 groups all values into
+            // single record on reducer which is additional overhead.
+            // -1, 1 - Returns input in random order. But comparator is invoked way more
+            // times than 0. Compared to 1, -1 invokes comparator even more.
+            // Going with 0 for now. After TEZ-661 this will not be required any more.
+            return 0;
+        }
+
+    }
+
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Tue Apr  1 19:49:09 2014
@@ -40,8 +40,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
-import org.apache.pig.data.BinSedesTuple;
 import org.apache.pig.data.SchemaTupleBackend;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
@@ -176,9 +176,9 @@ public class PigProcessor implements Log
             inputsToSkip.add(sampleVertex);
         }
 
-        LinkedList<TezLoad> tezLds = PlanHelper.getPhysicalOperators(execPlan, TezLoad.class);
-        for (TezLoad tezLd : tezLds){
-            tezLd.addInputsToSkip(inputsToSkip);
+        LinkedList<TezInput> tezInputs = PlanHelper.getPhysicalOperators(execPlan, TezInput.class);
+        for (TezInput tezInput : tezInputs){
+            tezInput.addInputsToSkip(inputsToSkip);
         }
 
         LinkedList<ReadScalarsTez> scalarInputs = new LinkedList<ReadScalarsTez>();
@@ -201,8 +201,8 @@ public class PigProcessor implements Log
             }
         }
 
-        for (TezLoad tezLd : tezLds){
-            tezLd.attachInputs(inputs, conf);
+        for (TezInput tezInput : tezInputs){
+            tezInput.attachInputs(inputs, conf);
         }
 
         for (ReadScalarsTez scalarInput: scalarInputs) {
@@ -272,7 +272,7 @@ public class PigProcessor implements Log
         Object val = reader.getCurrentValue();
         if (val != null) {
             // Sample is not empty
-            BinSedesTuple t = (BinSedesTuple) val;
+            Tuple t = (Tuple) val;
             sampleMap = (Map<String, Object>) t.get(0);
         }
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java Tue Apr  1 19:49:09 2014
@@ -30,7 +30,7 @@ import org.apache.pig.data.Tuple;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
-public class ReadScalarsTez extends EvalFunc<Object> implements TezLoad {
+public class ReadScalarsTez extends EvalFunc<Object> implements TezInput {
     private static final Log LOG = LogFactory.getLog(ReadScalarsTez.class);
     private String inputKey;
     private transient Tuple t;
@@ -41,6 +41,18 @@ public class ReadScalarsTez extends Eval
     }
 
     @Override
+    public String[] getTezInputs() {
+        return new String[] { inputKey };
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        if (oldInputKey.equals(inputKey)) {
+            inputKey = newInputKey;
+        }
+    }
+
+    @Override
     public void addInputsToSkip(Set<String> inputsToSkip) {
         String cacheKey = "scalar-" + inputKey;
         Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/RoundRobinPartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/RoundRobinPartitioner.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/RoundRobinPartitioner.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/RoundRobinPartitioner.java Tue Apr  1 19:49:09 2014
@@ -19,13 +19,12 @@ package org.apache.pig.backend.hadoop.ex
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.pig.impl.io.PigNullableWritable;
 
-public class RoundRobinPartitioner extends Partitioner<PigNullableWritable, Writable> {
+public class RoundRobinPartitioner extends Partitioner<Writable, Writable> {
     private int num = 0;
 
     @Override
-    public int getPartition(PigNullableWritable key, Writable value, int numPartitions) {
+    public int getPartition(Writable key, Writable value, int numPartitions) {
         num = ++num % numPartitions;
         return num;
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Tue Apr  1 19:49:09 2014
@@ -83,8 +83,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.operators.POCounterStatsTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.operators.POCounterTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.operators.PORankTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.operators.POShuffledValueInputTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
-import org.apache.pig.data.BinSedesTuple;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.DefaultIndexableLoader;
@@ -278,9 +278,7 @@ public class TezCompiler extends PhyPlan
 
                     TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, from, tezOp);
                     //TODO shared edge once support is available in Tez
-                    edge.dataMovementType = DataMovementType.BROADCAST;
-                    edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
-                    edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+                    TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
                 }
             }
         }
@@ -353,15 +351,11 @@ public class TezCompiler extends PhyPlan
 
                 // Connect splitter to splittee
                 TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, storeTezOper, storeOnlyTezOperator);
-                edge.dataMovementType = DataMovementType.ONE_TO_ONE;
-                edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
-                edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+                TezCompilerUtil.configureValueOnlyTupleOutput(edge,  DataMovementType.ONE_TO_ONE);
                 storeOnlyTezOperator.setRequestedParallelismByReference(storeTezOper);
 
                 edge = TezCompilerUtil.connect(tezPlan, storeTezOper, curTezOp);
-                edge.dataMovementType = DataMovementType.ONE_TO_ONE;
-                edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
-                edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+                TezCompilerUtil.configureValueOnlyTupleOutput(edge,  DataMovementType.ONE_TO_ONE);
                 curTezOp.setRequestedParallelismByReference(storeTezOper);
 
                 return;
@@ -1073,9 +1067,7 @@ public class TezCompiler extends PhyPlan
         try{
             nonBlocking(op);
             phyToTezOpMap.put(op, curTezOp);
-            if (op.getPkgr().getPackageType() == PackageType.UNION) {
-                curTezOp.markUnion();
-            } else if (op.getPkgr().getPackageType() == PackageType.JOIN) {
+            if (op.getPkgr().getPackageType() == PackageType.JOIN) {
                 curTezOp.markRegularJoin();
             } else if (op.getPkgr().getPackageType() == PackageType.GROUP) {
                 if (op.getNumInps() == 1) {
@@ -1141,11 +1133,7 @@ public class TezCompiler extends PhyPlan
             // Connect counterOper vertex to rankOper vertex by 1-1 edge
             rankOper.setRequestedParallelismByReference(counterOper);
             TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, counterOper, rankOper);
-            edge.dataMovementType = DataMovementType.ONE_TO_ONE;
-            edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
-            edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
-            edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName());
-            edge.setIntermediateOutputValueClass(BinSedesTuple.class.getName());
+            TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.ONE_TO_ONE);
             counterTez.setTuplesOutputKey(rankOper.getOperatorKey().toString());
             rankTez.setTuplesInputKey(counterOper.getOperatorKey().toString());
 
@@ -1161,12 +1149,8 @@ public class TezCompiler extends PhyPlan
 
             // Connect statsOper vertex to rankOper vertex by Broadcast edge
             edge = TezCompilerUtil.connect(tezPlan, statsOper, rankOper);
-            edge.dataMovementType = DataMovementType.BROADCAST;
-            edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
-            edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
-            edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName());
-            // Map of task id, offset count based on total number of records
-            edge.setIntermediateOutputValueClass(BinSedesTuple.class.getName());
+            // Map of task id, offset count based on total number of records is in the value
+            TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
             counterStatsTez.setOutputKey(rankOper.getOperatorKey().toString());
             rankTez.setStatsInputKey(statsOper.getOperatorKey().toString());
 
@@ -1409,9 +1393,7 @@ public class TezCompiler extends PhyPlan
 
                 // Configure broadcast edges for distribution map
                 edge = TezCompilerUtil.connect(tezPlan, sampleJobPair.first, joinJobs[i]);
-                edge.dataMovementType = DataMovementType.BROADCAST;
-                edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
-                edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+                TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
                 sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
 
                 // Configure skewed partitioner for join
@@ -1953,9 +1935,7 @@ public class TezCompiler extends PhyPlan
             lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString());
 
             edge = TezCompilerUtil.connect(tezPlan, quantJobParallelismPair.first, sortOpers[0]);
-            edge.dataMovementType = DataMovementType.BROADCAST;
-            edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
-            edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+            TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
             POValueOutputTez sampleOut = (POValueOutputTez)quantJobParallelismPair.first.plan.getLeaves().get(0);
             sampleOut.addOutputKey(sortOpers[0].getOperatorKey().toString());
             sortOpers[0].sampleOperator = quantJobParallelismPair.first;
@@ -1998,9 +1978,7 @@ public class TezCompiler extends PhyPlan
             output.addOutputKey(curTezOp.getOperatorKey().toString());
             TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, splitOp, curTezOp);
             //TODO shared edge once support is available in Tez
-            edge.dataMovementType = DataMovementType.ONE_TO_ONE;
-            edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
-            edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+            TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.ONE_TO_ONE);
             curTezOp.setRequestedParallelismByReference(splitOp);
             POValueInputTez input = new POValueInputTez(OperatorKey.genOpKey(scope));
             input.setInputKey(splitOp.getOperatorKey().toString());
@@ -2044,45 +2022,31 @@ public class TezCompiler extends PhyPlan
     @Override
     public void visitUnion(POUnion op) throws VisitorException {
         try {
-            // Add alias vertex. This will be converted to VertexGroup by
-            // TezDagBuilder.
-            TezOperator newTezOp = getTezOp();
-            tezPlan.add(newTezOp);
-            POLocalRearrangeTez[] outputs = new POLocalRearrangeTez[compiledInputs.length];
+            // Without VertexGroup (UnionOptimizer), there is an extra union vertex
+            // which unions input from the two predecessor vertices
+            TezOperator unionTezOp = getTezOp();
+            tezPlan.add(unionTezOp);
+            unionTezOp.markUnion();
+            unionTezOp.setRequestedParallelism(op.getRequestedParallelism());
+            POShuffledValueInputTez unionInput =  new POShuffledValueInputTez(OperatorKey.genOpKey(scope));
+            unionTezOp.plan.addAsLeaf(unionInput);
+
+            POValueOutputTez[] outputs = new POValueOutputTez[compiledInputs.length];
             for (int i = 0; i < compiledInputs.length; i++) {
                 TezOperator prevTezOp = compiledInputs[i];
-                TezCompilerUtil.connect(tezPlan, prevTezOp, newTezOp);
-                // TODO: Use POValueOutputTez instead of POLocalRearrange and
-                // unsorted shuffle with TEZ-661 and PIG-3775.
-                outputs[i] = localRearrangeFactory.create();
-                outputs[i].setUnion(true);
+                // Some predecessors of union need not be part of the union (For eg: replicated join).
+                // So mark predecessors that are input to the union operation.
+                unionTezOp.addUnionPredecessor(prevTezOp.getOperatorKey());
+                TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, prevTezOp, unionTezOp);
+                TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.SCATTER_GATHER);
+                outputs[i] = new POValueOutputTez(OperatorKey.genOpKey(scope));
+                outputs[i].addOutputKey(unionTezOp.getOperatorKey().toString());
+                unionInput.addInputKey(prevTezOp.getOperatorKey().toString());
                 prevTezOp.plan.addAsLeaf(outputs[i]);
                 prevTezOp.setClosed(true);
             }
-            OperatorKey unionKey = newTezOp.getOperatorKey();
-            newTezOp.markUnion();
-            curTezOp = newTezOp;
-
-            // Start a new TezOp so that the successor in physical plan can be
-            // added to it.
-            newTezOp = getTezOp();
-            tezPlan.add(newTezOp);
-            tezPlan.connect(curTezOp, newTezOp);
-
-            // Connect the POValueOutputTezs in the predecessor vertices to the
-            // succeeding vertex.
-            for (int i = 0; i < outputs.length; i++) {
-                outputs[i].setOutputKey(newTezOp.getOperatorKey().toString());
-            }
-            // The first operator in the succeeding vertex must be
-            // POVertexGroupInputTez.
-            POVertexGroupInputTez grpInput = new POVertexGroupInputTez(newTezOp.getOperatorKey());
-            grpInput.setInputKey(unionKey.toString());
-            grpInput.setAlias(op.getAlias());
-            newTezOp.plan.add(grpInput);
-            curTezOp = newTezOp;
 
-            curTezOp.setRequestedParallelism(op.getRequestedParallelism());
+            curTezOp = unionTezOp;
             phyToTezOpMap.put(op, curTezOp);
         } catch (Exception e) {
             int errCode = 2034;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue Apr  1 19:49:09 2014
@@ -76,7 +76,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.TezPOPackageAnnotator.LoRearrangeDiscoverer;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
-import org.apache.pig.data.BinSedesTuple;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -94,8 +93,6 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
@@ -111,9 +108,8 @@ import org.apache.tez.mapreduce.hadoop.M
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 
 /**
  * A visitor to construct DAG out of Tez plan.
@@ -148,7 +144,7 @@ public class TezDagBuilder extends TezOp
         // Construct vertex for the current Tez operator
         Vertex to = null;
         try {
-            if (!tezOp.isAliasVertex()) {
+            if (!tezOp.isVertexGroup()) {
                 boolean isMap = (predecessors == null || predecessors.isEmpty()) ? true : false;
                 to = newVertex(tezOp, isMap);
                 dag.addVertex(to);
@@ -170,16 +166,16 @@ public class TezDagBuilder extends TezOp
                 // must have already been created.
                 TezOperator pred = predecessors.get(i);
                 try {
-                    if (pred.isAliasVertex()) {
-                        VertexGroup from = pred.getVertexGroup();
-                        GroupInputEdge edge = newGroupInputEdge(from, to);
+                    if (pred.isVertexGroup()) {
+                        VertexGroup from = pred.getVertexGroupInfo().getVertexGroup();
+                        GroupInputEdge edge = newGroupInputEdge(pred, tezOp, from, to);
                         dag.addEdge(edge);
                     } else {
                         Vertex from = dag.getVertex(pred.getOperatorKey().toString());
-                        EdgeProperty prop = newEdge(pred, tezOp);
-                        if (tezOp.isAliasVertex()) {
+                        if (tezOp.isVertexGroup()) {
                             groupMembers[i] = from;
                         } else {
+                            EdgeProperty prop = newEdge(pred, tezOp);
                             Edge edge = new Edge(from, to, prop);
                             dag.addEdge(edge);
                         }
@@ -190,28 +186,30 @@ public class TezDagBuilder extends TezOp
                 }
             }
 
-            if (tezOp.isAliasVertex()) {
+            if (tezOp.isVertexGroup()) {
                 String groupName = tezOp.getOperatorKey().toString();
-                tezOp.setVertexGroup(dag.createVertexGroup(groupName, groupMembers));
+                VertexGroup vertexGroup = dag.createVertexGroup(groupName, groupMembers);
+                tezOp.getVertexGroupInfo().setVertexGroup(vertexGroup);
+                POStore store = tezOp.getVertexGroupInfo().getStore();
+                if (store != null) {
+                    vertexGroup.addOutput(store.getOperatorKey().toString(),
+                            tezOp.getVertexGroupInfo().getStoreOutputDescriptor(),
+                            MROutputCommitter.class);
+                }
             }
         }
     }
 
-    private GroupInputEdge newGroupInputEdge(VertexGroup from, Vertex to)
-            throws IOException {
-        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
-        setIntermediateInputKeyValue(DataType.TUPLE, conf, null);
-        setIntermediateOutputKeyValue(DataType.TUPLE, conf, null);
-        MRToTezHelper.convertMRToTezRuntimeConf(conf, globalConf);
+    private GroupInputEdge newGroupInputEdge(TezOperator fromOp,
+            TezOperator toOp, VertexGroup from, Vertex to) throws IOException {
 
-        return new GroupInputEdge(from, to, new EdgeProperty(
-                DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
-                SchedulingType.SEQUENTIAL,
-                new OutputDescriptor(OnFileSortedOutput.class.getName())
-                    .setUserPayload(TezUtils.createUserPayloadFromConf(conf)),
-                new InputDescriptor(ShuffledMergedInput.class.getName())
-                    .setUserPayload(TezUtils.createUserPayloadFromConf(conf))),
-                    new InputDescriptor(ConcatenatedMergedKeyValuesInput.class.getName()));
+        EdgeProperty edgeProperty = newEdge(fromOp, toOp);
+
+        String groupInputClass = edgeProperty.getDataMovementType().equals(
+                DataMovementType.SCATTER_GATHER) ? ConcatenatedMergedKeyValuesInput.class
+                .getName() : ConcatenatedMergedKeyValueInput.class.getName();
+        return new GroupInputEdge(from, to, edgeProperty,
+                new InputDescriptor(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()));
     }
 
     /**
@@ -249,25 +247,6 @@ public class TezDagBuilder extends TezOp
             }
         }
 
-        //TODO: Remove this and set the classes on edge in TezCompiler
-        List<POValueOutputTez> valueOutputs = PlanHelper.getPhysicalOperators(from.plan,
-                POValueOutputTez.class);
-        if (!valueOutputs.isEmpty()) {
-            POValueOutputTez valueOutput = valueOutputs.get(0);
-            for (String outputKey : valueOutput.outputKeys) {
-                if (outputKey.equals(to.getOperatorKey().toString())) {
-                    conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
-                            POValueOutputTez.EmptyWritable.class.getName());
-                    conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS,
-                            BinSedesTuple.class.getName());
-                    conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
-                            POValueOutputTez.EmptyWritable.class.getName());
-                    conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
-                            BinSedesTuple.class.getName());
-                }
-            }
-        }
-
         conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS,
                 MRPartitioner.class.getName());
 
@@ -285,6 +264,13 @@ public class TezDagBuilder extends TezOp
                     edge.getIntermediateOutputValueClass());
         }
 
+        if (edge.getIntermediateOutputKeyComparatorClass() != null) {
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS,
+                    edge.getIntermediateOutputKeyComparatorClass());
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
+                    edge.getIntermediateOutputKeyComparatorClass());
+        }
+
         conf.setBoolean("mapred.mapper.new-api", true);
         conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
 
@@ -442,11 +428,15 @@ public class TezDagBuilder extends TezOp
                 if (tezOp.sampleOperator != null && tezOp.sampleOperator == pred) {
                     // skip sample vertex input
                 } else {
+                    String inputKey = pred.getOperatorKey().toString();
+                    if (pred.isVertexGroup()) {
+                        pred = mPlan.getOperator(pred.getVertexGroupPredecessors().get(0));
+                    }
                     LinkedList<POLocalRearrangeTez> lrs =
                             PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
                     for (POLocalRearrangeTez lr : lrs) {
                         if (lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
-                            localRearrangeMap.put((int)lr.getIndex(), pred.getOperatorKey().toString());
+                            localRearrangeMap.put((int)lr.getIndex(), inputKey);
                         }
                     }
                 }
@@ -554,10 +544,16 @@ public class TezDagBuilder extends TezOp
             outputPayLoad.set(JobControlCompiler.PIG_REDUCE_STORES,
                     ObjectSerializer.serialize(singleStore));
 
+            OutputDescriptor storeOutDescriptor = new OutputDescriptor(
+                    MROutput.class.getName()).setUserPayload(TezUtils
+                    .createUserPayloadFromConf(outputPayLoad));
+            if (tezOp.getVertexGroupStores() != null) {
+                if (tezOp.getVertexGroupStores().containsKey(store.getOperatorKey())) {
+                    continue;
+                }
+            }
             vertex.addOutput(store.getOperatorKey().toString(),
-                    new OutputDescriptor(MROutput.class.getName())
-                            .setUserPayload(TezUtils.createUserPayloadFromConf(outputPayLoad)),
-                            MROutputCommitter.class);
+                    storeOutDescriptor, MROutputCommitter.class);
         }
 
         if (stores.size() > 0) {
@@ -651,6 +647,7 @@ public class TezDagBuilder extends TezOp
             IOException {
         LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
                 tezOp.plan, POStore.class);
+
         if (stores.size() > 0) {
 
             ArrayList<POStore> storeLocations = new ArrayList<POStore>();

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java Tue Apr  1 19:49:09 2014
@@ -49,6 +49,7 @@ public class TezEdgeDescriptor {
 
     private String intermediateOutputKeyClass;
     private String intermediateOutputValueClass;
+    private String intermediateOutputKeyComparatorClass;
 
     public TezEdgeDescriptor() {
         combinePlan = new PhysicalPlan();
@@ -98,4 +99,13 @@ public class TezEdgeDescriptor {
         this.intermediateOutputValueClass = intermediateOutputValueClass;
     }
 
+    public String getIntermediateOutputKeyComparatorClass() {
+        return intermediateOutputKeyComparatorClass;
+    }
+
+    public void setIntermediateOutputKeyComparatorClass(
+            String intermediateOutputKeyComparatorClass) {
+        this.intermediateOutputKeyComparatorClass = intermediateOutputKeyComparatorClass;
+    }
+
 }

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezInput.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezInput.java?rev=1583768&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezInput.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezInput.java Tue Apr  1 19:49:09 2014
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.tez.runtime.api.LogicalInput;
+
+/**
+ * This interface is implemented by PhysicalOperators that can have Tez inputs
+ * attached directly to the operator.
+ */
+
+public interface TezInput {
+
+    public String[] getTezInputs();
+
+    public void replaceInput(String oldInputKey, String newInputKey);
+
+    /**
+     * Add to the list of inputs to skip download if already available in vertex cache
+     *
+     * @param inputsToSkip
+     */
+    public void addInputsToSkip(Set<String> inputsToSkip);
+
+    public void attachInputs(Map<String, LogicalInput> inputs,
+            Configuration conf) throws ExecException;
+
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Tue Apr  1 19:49:09 2014
@@ -32,6 +32,7 @@ import org.apache.pig.backend.BackendExc
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.Launcher;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.UnionOptimizer;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
@@ -149,6 +150,7 @@ public class TezLauncher extends Launche
             throws PlanException, IOException, VisitorException {
         TezCompiler comp = new TezCompiler(php, pc, tezResourceManager);
         TezOperPlan tezPlan = comp.compile();
+
         boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty(
                 PigConfiguration.PROP_NO_COMBINER, "false"));
 
@@ -188,6 +190,14 @@ public class TezLauncher extends Launche
             accum.visit();
         }
 
+        boolean isUnionOpt = "true".equalsIgnoreCase(pc.getProperties()
+                .getProperty(PigConfiguration.TEZ_OPT_UNION, "false"));
+        // Use VertexGroup in Tez
+        if (isUnionOpt) {
+            UnionOptimizer uo = new UnionOptimizer(tezPlan);
+            uo.visit();
+        }
+
         return comp.getPlanContainer();
     }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Tue Apr  1 19:49:09 2014
@@ -29,6 +29,7 @@ import java.util.Set;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -127,11 +128,23 @@ public class TezOperPlan extends Operato
 
     @Override
     public void remove(TezOperator op) {
-        //TODO Cleanup outEdges of predecessors and inEdges of successors
-        //TezDAGBuilder would not create the edge. So low priority
+        // The remove method does not replace output and input keys in TezInput
+        // and TezOutput. That has to be handled separately.
+        for (OperatorKey opKey : op.outEdges.keySet()) {
+            getOperator(opKey).inEdges.remove(op.getOperatorKey());
+        }
+        for (OperatorKey opKey : op.inEdges.keySet()) {
+            getOperator(opKey).outEdges.remove(op.getOperatorKey());
+        }
         super.remove(op);
     }
 
+    @Override
+    public boolean disconnect(TezOperator from, TezOperator to) {
+        from.outEdges.remove(to.getOperatorKey());
+        to.outEdges.remove(from.getOperatorKey());
+        return super.disconnect(from, to);
+    }
 
 }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Tue Apr  1 19:49:09 2014
@@ -18,15 +18,20 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.VertexGroup;
 
 import com.google.common.collect.Maps;
@@ -126,8 +131,11 @@ public class TezOperator extends Operato
 
     OPER_FEATURE feature = OPER_FEATURE.NONE;
 
+    private List<OperatorKey> vertexGroupPredecessors;
     // For union
-    private VertexGroup group = null;
+    private VertexGroupInfo vertexGroupInfo;
+    // Mapping of OperatorKey of POStore OperatorKey to vertexGroup TezOperator
+    private Map<OperatorKey, OperatorKey> vertexGroupStores = null;
 
     public TezOperator(OperatorKey k) {
         super(k);
@@ -245,18 +253,48 @@ public class TezOperator extends Operato
         this.useSecondaryKey = useSecondaryKey;
     }
 
-    public void setVertexGroup(VertexGroup group) {
-        this.group = group;
+    public List<OperatorKey> getUnionPredecessors() {
+        return vertexGroupPredecessors;
     }
 
-    public VertexGroup getVertexGroup() {
-        return this.group;
+    public List<OperatorKey> getVertexGroupPredecessors() {
+        return vertexGroupPredecessors;
+    }
+
+    public void addUnionPredecessor(OperatorKey unionPredecessor) {
+        if (vertexGroupPredecessors == null) {
+            vertexGroupPredecessors = new ArrayList<OperatorKey>();
+        }
+        this.vertexGroupPredecessors.add(unionPredecessor);
+    }
+
+    public void setVertexGroupPredecessors(List<OperatorKey> vertexGroupPredecessors) {
+        this.vertexGroupPredecessors = vertexGroupPredecessors;
     }
 
     // Union is the only operator that uses alias vertex (VertexGroup) now. But
     // more operators could be added to the list in the future.
-    public boolean isAliasVertex() {
-        return isUnion();
+    public boolean isVertexGroup() {
+        return vertexGroupInfo != null;
+    }
+
+    public VertexGroupInfo getVertexGroupInfo() {
+        return vertexGroupInfo;
+    }
+
+    public void setVertexGroupInfo(VertexGroupInfo vertexGroup) {
+        this.vertexGroupInfo = vertexGroup;
+    }
+
+    public void addVertexGroupStore(OperatorKey storeKey, OperatorKey vertexGroupKey) {
+        if (this.vertexGroupStores == null) {
+            this.vertexGroupStores = new HashMap<OperatorKey, OperatorKey>();
+        }
+        this.vertexGroupStores.put(storeKey, vertexGroupKey);
+    }
+
+    public Map<OperatorKey, OperatorKey> getVertexGroupStores() {
+        return this.vertexGroupStores;
     }
 
     @Override
@@ -353,5 +391,60 @@ public class TezOperator extends Operato
         return combineSmallSplits;
     }
 
+    public static class VertexGroupInfo {
+
+        private List<OperatorKey> inputKeys;
+        private String outputKey;
+        private POStore store;
+        private OutputDescriptor storeOutDescriptor;
+        private VertexGroup vertexGroup;
+
+        public VertexGroupInfo() {
+        }
+
+        public VertexGroupInfo(POStore store) {
+            this.store = store;
+        }
+
+        public List<OperatorKey> getInputs() {
+            return inputKeys;
+        }
+
+        public void addInput(OperatorKey input) {
+            if (inputKeys == null) {
+                inputKeys = new ArrayList<OperatorKey>();
+            }
+            this.inputKeys.add(input);
+        }
+
+        public String getOutput() {
+            return outputKey;
+        }
+
+        public void setOutput(String output) {
+            this.outputKey = output;
+        }
+
+        public POStore getStore() {
+            return store;
+        }
+
+        public OutputDescriptor getStoreOutputDescriptor() {
+            return storeOutDescriptor;
+        }
+
+        public void setStoreOutputDescriptor(OutputDescriptor storeOutDescriptor) {
+            this.storeOutDescriptor = storeOutDescriptor;
+        }
+
+        public VertexGroup getVertexGroup() {
+            return vertexGroup;
+        }
+
+        public void setVertexGroup(VertexGroup vertexGroup) {
+            this.vertexGroup = vertexGroup;
+        }
+
+    }
 }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOutput.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOutput.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOutput.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOutput.java Tue Apr  1 19:49:09 2014
@@ -30,6 +30,12 @@ import org.apache.tez.runtime.api.Logica
  */
 
 public interface TezOutput {
+
+    public String[] getTezOutputs();
+
+    public void replaceOutput(String oldOutputKey, String newOutputKey);
+
     public void attachOutputs(Map<String, LogicalOutput> outputs,
             Configuration conf) throws ExecException;
+
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java Tue Apr  1 19:49:09 2014
@@ -66,6 +66,10 @@ public class TezPOPackageAnnotator exten
         List<TezOperator> preds = this.mPlan.getPredecessors(pkgTezOp);
         for (Iterator<TezOperator> it = preds.iterator(); it.hasNext();) {
             TezOperator predTezOp = it.next();
+            if (predTezOp.isVertexGroup()) {
+                // Just get one of the inputs to vertex group
+                predTezOp = getPlan().getOperator(predTezOp.getVertexGroupPredecessors().get(0));
+            }
             lrFound += patchPackage(predTezOp, pkgTezOp, pkg);
             if(lrFound == pkg.getNumInps()) {
                 break;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java Tue Apr  1 19:49:09 2014
@@ -24,6 +24,7 @@ import java.util.Map.Entry;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator.VertexGroupInfo;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
@@ -51,8 +52,11 @@ public class TezPrinter extends TezOpPla
 
     @Override
     public void visitTezOp(TezOperator tezOper) throws VisitorException {
-        if (tezOper.isAliasVertex()) {
-            mStream.println("Tez vertex group " + tezOper.getOperatorKey().toString());
+        if (tezOper.isVertexGroup()) {
+            VertexGroupInfo info = tezOper.getVertexGroupInfo();
+            mStream.println("Tez vertex group "
+                    + tezOper.getOperatorKey().toString() + "\t<-\t "
+                    + info.getInputs() + "\t->\t " + info.getOutput());
             mStream.println("# No plan on vertex group");
         } else {
             mStream.println("Tez vertex " + tezOper.getOperatorKey().toString());

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java Tue Apr  1 19:49:09 2014
@@ -33,7 +33,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.POValueOutputTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezLoad;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezOutput;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -47,7 +47,7 @@ import org.apache.tez.runtime.library.ap
 /**
  * POCounterStatsTez is used to group counters from previous vertex POCounterTez tasks
  */
-public class POCounterStatsTez extends PhysicalOperator implements TezLoad, TezOutput {
+public class POCounterStatsTez extends PhysicalOperator implements TezInput, TezOutput {
 
     private static final long serialVersionUID = 1L;
     private static final Log LOG = LogFactory.getLog(POCounterStatsTez.class);
@@ -63,6 +63,18 @@ public class POCounterStatsTez extends P
     }
 
     @Override
+    public String[] getTezInputs() {
+        return new String[] { inputKey };
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        if (oldInputKey.equals(inputKey)) {
+            inputKey = newInputKey;
+        }
+    }
+
+    @Override
     public void addInputsToSkip(Set<String> inputsToSkip) {
     }
 
@@ -83,6 +95,18 @@ public class POCounterStatsTez extends P
     }
 
     @Override
+    public String[] getTezOutputs() {
+        return new String[] { outputKey };
+    }
+
+    @Override
+    public void replaceOutput(String oldOutputKey, String newOutputKey) {
+        if (oldOutputKey.equals(outputKey)) {
+            outputKey = newOutputKey;
+        }
+    }
+
+    @Override
     public void attachOutputs(Map<String, LogicalOutput> outputs,
             Configuration conf) throws ExecException {
         LogicalOutput output = outputs.get(outputKey);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java Tue Apr  1 19:49:09 2014
@@ -54,6 +54,12 @@ public class POCounterTez extends POCoun
         super(copy);
     }
 
+    @Override
+    public void initialize(TezProcessorContext processorContext)
+            throws ExecException {
+        this.setTaskId(processorContext.getTaskIndex());
+    }
+
     public void setTuplesOutputKey(String tuplesOutputKey) {
         this.tuplesOutputKey = tuplesOutputKey;
     }
@@ -63,9 +69,17 @@ public class POCounterTez extends POCoun
     }
 
     @Override
-    public void initialize(TezProcessorContext processorContext)
-            throws ExecException {
-        this.setTaskId(processorContext.getTaskIndex());
+    public String[] getTezOutputs() {
+        return new String[] { tuplesOutputKey, statsOutputKey };
+    }
+
+    @Override
+    public void replaceOutput(String oldOutputKey, String newOutputKey) {
+        if (oldOutputKey.equals(tuplesOutputKey)) {
+            tuplesOutputKey = newOutputKey;
+        } else if (oldOutputKey.equals(statsOutputKey)) {
+            statsOutputKey = newOutputKey;
+        }
     }
 
     @Override

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java?rev=1583768&r1=1583767&r2=1583768&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java Tue Apr  1 19:49:09 2014
@@ -32,13 +32,13 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import org.apache.pig.backend.hadoop.executionengine.tez.ObjectCache;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezLoad;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezInput;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
-public class PORankTez extends PORank implements TezLoad {
+public class PORankTez extends PORank implements TezInput {
 
     private static final long serialVersionUID = 1L;
     private static final Log LOG = LogFactory.getLog(PORankTez.class);
@@ -62,6 +62,20 @@ public class PORankTez extends PORank im
     }
 
     @Override
+    public String[] getTezInputs() {
+        return new String[] { tuplesInputKey, statsInputKey };
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        if (oldInputKey.equals(tuplesInputKey)) {
+            tuplesInputKey = newInputKey;
+        } else if (oldInputKey.equals(statsInputKey)) {
+            statsInputKey = newInputKey;
+        }
+    }
+
+    @Override
     public void addInputsToSkip(Set<String> inputsToSkip) {
         String cacheKey = "rankstats-" + getOperatorKey().toString();
         Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);