You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/02/17 09:26:52 UTC

svn commit: r1568901 - in /pig/branches/tez/src/org/apache/pig: backend/hadoop/executionengine/physicalLayer/expressionOperators/ backend/hadoop/executionengine/physicalLayer/plans/ backend/hadoop/executionengine/physicalLayer/relationalOperators/ back...

Author: daijy
Date: Mon Feb 17 08:26:52 2014
New Revision: 1568901

URL: http://svn.apache.org/r1568901
Log:
PIG-3757: Make scalar work

Added:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
    pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java
Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
    pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Mon Feb 17 08:26:52 2014
@@ -538,6 +538,11 @@ public class POUserFunc extends Expressi
     public FuncSpec getFuncSpec() {
         return funcSpec;
     }
+    
+    public void setFuncSpec(FuncSpec funcSpec) {
+        this.funcSpec = funcSpec;
+        instantiateFunc(funcSpec);
+    }
 
     public String[] getCacheFiles() {
         return cacheFiles;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Mon Feb 17 08:26:52 2014
@@ -294,7 +294,12 @@ public class PhyPlanVisitor extends Plan
     }
 
     public void visitLimit(POLimit lim) throws VisitorException{
-        //do nothing
+        PhysicalPlan inpPlan = lim.getLimitPlan();
+        if (inpPlan!=null) {
+            pushWalker(mCurrentWalker.spawnChildWalker(inpPlan));
+            visit();
+            popWalker();
+        }
     }
 
     public void visitCross(POCross cross) throws VisitorException{

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Mon Feb 17 08:26:52 2014
@@ -59,7 +59,7 @@ public class CombinerPackager extends Pa
         super();
         keyType = pkgr.keyType;
         numInputs = 1;
-        inner = new boolean[1];
+        inner = new boolean[pkgr.inner.length];
         for (int i = 0; i < pkgr.inner.length; i++) {
             inner[i] = true;
         }

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java?rev=1568901&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java Mon Feb 17 08:26:52 2014
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class MultiQueryOptimizerTez extends TezOpPlanVisitor {
+    public MultiQueryOptimizerTez(TezOperPlan plan) {
+        super(plan, new ReverseDependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+    }
+
+    @Override
+    public void visitTezOp(TezOperator tezOp) throws VisitorException {
+        try {
+            if (!tezOp.isSplitter()) {
+                return;
+            }
+
+            List<TezOperator> splittees = new ArrayList<TezOperator>();
+
+            List<TezOperator> successors = getPlan().getSuccessors(tezOp);
+            List<TezOperator> succ_successors = new ArrayList<TezOperator>();
+            for (TezOperator successor : successors) {
+                // don't want to be complicated by nested split
+                if (successor.isSplitter()) {
+                    continue;
+                }
+                // If has other dependency, don't merge into split,
+                if (getPlan().getPredecessors(successor).size()!=1) {
+                    continue;
+                }
+                boolean containsBlacklistedOp = false;
+                for (PhysicalOperator op : successor.plan) {
+                    if (op instanceof POReservoirSample || op instanceof POPoissonSample) {
+                        containsBlacklistedOp = true;
+                        break;
+                    }
+                }
+                if (containsBlacklistedOp) {
+                    continue;
+                }
+                // Detect diamond shape, we cannot merge it into split, since Tez
+                // does not handle double edge between vertexes
+                boolean sharedSucc = false;
+                if (getPlan().getSuccessors(successor)!=null) {
+                    for (TezOperator succ_successor : getPlan().getSuccessors(successor)) {
+                        if (succ_successors.contains(succ_successor)) {
+                            sharedSucc = true;
+                            break;
+                        }
+                    }
+                    succ_successors.addAll(getPlan().getSuccessors(successor));
+                }
+                if (sharedSucc) {
+                    continue;
+                }
+                splittees.add(successor);
+            }
+
+            if (splittees.size()==0) {
+                return;
+            }
+
+            if (splittees.size()==1 && successors.size()==1) {
+                // We don't need a POSplit here, we can merge the splittee into spliter
+                PhysicalOperator firstNodeLeaf = tezOp.plan.getLeaves().get(0);
+                PhysicalOperator firstNodeLeafPred  = tezOp.plan.getPredecessors(firstNodeLeaf).get(0);
+
+                TezOperator singleSplitee = splittees.get(0);
+                PhysicalOperator secondNodeRoot =  singleSplitee.plan.getRoots().get(0);
+                PhysicalOperator secondNodeSucc = singleSplitee.plan.getSuccessors(secondNodeRoot).get(0);
+
+                tezOp.plan.remove(firstNodeLeaf);
+                singleSplitee.plan.remove(secondNodeRoot);
+                
+                //TODO remove filter all
+
+                tezOp.plan.merge(singleSplitee.plan);
+                tezOp.plan.connect(firstNodeLeafPred, secondNodeSucc);
+
+                addSubPlanPropertiesToParent(tezOp, singleSplitee);
+                
+                removeSplittee(getPlan(), tezOp, singleSplitee);
+            } else {
+                POValueOutputTez valueOutput = (POValueOutputTez)tezOp.plan.getLeaves().get(0);
+                POSplit split = new POSplit(OperatorKey.genOpKey(valueOutput.getOperatorKey().getScope()));
+                for (TezOperator splitee : splittees) {
+                    PhysicalOperator spliteeRoot =  splitee.plan.getRoots().get(0);
+                    splitee.plan.remove(spliteeRoot);
+                    split.addPlan(splitee.plan);
+                    
+                    addSubPlanPropertiesToParent(tezOp, splitee);
+                    
+                    removeSplittee(getPlan(), tezOp, splitee);
+                    valueOutput.outputKeys.remove(splitee.getOperatorKey().toString());
+                }
+                if (!valueOutput.outputKeys.isEmpty()) {
+                    // We still need valueOutput
+                    PhysicalPlan phyPlan = new PhysicalPlan();
+                    phyPlan.addAsLeaf(valueOutput);
+                    split.addPlan(phyPlan);
+                }
+                PhysicalOperator pred = tezOp.plan.getPredecessors(valueOutput).get(0);
+                tezOp.plan.disconnect(pred, valueOutput);
+                tezOp.plan.remove(valueOutput);
+                tezOp.plan.add(split);
+                tezOp.plan.connect(pred, split);
+            }
+        } catch (PlanException e) {
+            throw new VisitorException(e);
+        }
+    }
+    
+    static public void removeSplittee(TezOperPlan plan, TezOperator splitter, TezOperator splittee) throws PlanException {
+        if (plan.getSuccessors(splittee)!=null) {
+            List<TezOperator> succs = new ArrayList<TezOperator>();
+            succs.addAll(plan.getSuccessors(splittee));
+            plan.disconnect(splitter, splittee);
+            for (TezOperator succTezOperator : succs) {
+                TezEdgeDescriptor edge = succTezOperator.inEdges.get(splittee.getOperatorKey());
+
+                splitter.outEdges.remove(splittee.getOperatorKey());
+                succTezOperator.inEdges.remove(splittee.getOperatorKey());
+                plan.disconnect(splittee, succTezOperator);
+                TezCompilerUtil.connect(plan, splitter, succTezOperator, edge);
+            }
+        }
+        plan.remove(splittee);
+    }
+
+    static public void addSubPlanPropertiesToParent(TezOperator parentOper, TezOperator subPlanOper) {
+        if (subPlanOper.getRequestedParallelism() > parentOper.getRequestedParallelism()) {
+            parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism());
+        }
+        subPlanOper.setRequestedParallelismByReference(parentOper);
+        if (subPlanOper.UDFs != null) {
+            parentOper.UDFs.addAll(subPlanOper.UDFs);
+        }
+        if (subPlanOper.scalars != null) {
+            parentOper.scalars.addAll(subPlanOper.scalars);
+        }
+        if (subPlanOper.outEdges != null) {
+            for (Entry<OperatorKey, TezEdgeDescriptor> entry: subPlanOper.outEdges.entrySet()) {
+                parentOper.outEdges.put(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Mon Feb 17 08:26:52 2014
@@ -33,12 +33,14 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.data.BinSedesTuple;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.ReadScalarsTez;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.tez.common.TezUtils;
@@ -158,6 +160,12 @@ public class PigProcessor implements Log
         for (POValueInputTez input : valueInputs){
             input.attachInputs(inputs, conf);
         }
+        LinkedList<POUserFunc> scalarInputs = PlanHelper.getPhysicalOperators(execPlan, POUserFunc.class);
+        for (POUserFunc userFunc : scalarInputs ) {
+            if (userFunc.getFunc() instanceof ReadScalarsTez) {
+                ((ReadScalarsTez)userFunc.getFunc()).attachInputs(inputs, conf);
+            }
+        }
         LinkedList<POFRJoinTez> broadcasts = PlanHelper.getPhysicalOperators(execPlan, POFRJoinTez.class);
         for (POFRJoinTez broadcast : broadcasts){
             broadcast.attachInputs(inputs, conf);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java Mon Feb 17 08:26:52 2014
@@ -65,6 +65,10 @@ public class SecondaryKeyOptimizerTez ex
                     break;
                 }
             }
+            
+            if (connectingLR == null) {
+                continue;
+            }
 
             // Detected the POLocalRearrange -> POPackage pattern. Let's add
             // combiner if possible.

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Mon Feb 17 08:26:52 2014
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -84,6 +85,7 @@ import org.apache.pig.impl.builtin.Defau
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.builtin.GetMemNumRows;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
+import org.apache.pig.impl.builtin.ReadScalarsTez;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -223,17 +225,6 @@ public class TezCompiler extends PhyPlan
 
         for (PhysicalOperator op : ops) {
             compile(op);
-            if (curTezOp.isSplitSubPlan()) {
-                // Set inputs to null as POSplit will attach input to roots
-                for (PhysicalOperator root : curTezOp.plan.getRoots()) {
-                    root.setInputs(null);
-                }
-                TezOperator splitOp = splitsSeen.get(curTezOp.getSplitOperatorKey());
-                POSplit split = findPOSplit(splitOp, curTezOp.getSplitOperatorKey());
-                split.addPlan(curTezOp.plan);
-                addSubPlanPropertiesToParent(splitOp, curTezOp);
-                curTezOp = splitOp;
-            }
         }
 
         for (TezOperator tezOper : splitsSeen.values()) {
@@ -245,39 +236,42 @@ public class TezCompiler extends PhyPlan
             }
             tezOper.setClosed(true);
         }
-
-        connectSoftLink();
+        
+        fixScalar();
 
         return tezPlan;
     }
+    
+    private void fixScalar() throws VisitorException, PlanException {
+        // Mapping POStore to POValueOuptut
+        Map<POStore, POValueOutputTez> storeSeen = new HashMap<POStore, POValueOutputTez>();
+        
+        for (TezOperator tezOp : tezPlan) {
+            List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class);
+            for (POUserFunc userFunc : userFuncs) {
+                if (userFunc.getReferencedOperator()!=null) {  // Scalar
+                    POStore store = (POStore)userFunc.getReferencedOperator();
+ 
+                    TezOperator from = phyToTezOpMap.get(store);
 
-    private void addSubPlanPropertiesToParent(TezOperator parentOper, TezOperator subPlanOper) {
-        if (subPlanOper.getRequestedParallelism() > parentOper.getRequestedParallelism()) {
-            parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism());
-        }
-        subPlanOper.setRequestedParallelismByReference(parentOper);
-        if (subPlanOper.UDFs != null) {
-            parentOper.UDFs.addAll(subPlanOper.UDFs);
-        }
-        if (subPlanOper.outEdges != null) {
-            for (Entry<OperatorKey, TezEdgeDescriptor> entry: subPlanOper.outEdges.entrySet()) {
-                parentOper.outEdges.put(entry.getKey(), entry.getValue());
-            }
-        }
-    }
+                    FuncSpec newSpec = new FuncSpec(ReadScalarsTez.class.getName(), from.getOperatorKey().toString());
+                    userFunc.setFuncSpec(newSpec);
 
-    private void connectSoftLink() throws PlanException, IOException {
-        for (PhysicalOperator op : plan) {
-            if (plan.getSoftLinkPredecessors(op)!=null) {
-                for (PhysicalOperator pred : plan.getSoftLinkPredecessors(op)) {
-                    TezOperator from = phyToTezOpMap.get(pred);
-                    TezOperator to = phyToTezOpMap.get(op);
-                    if (from==to) {
-                        continue;
-                    }
-                    if (tezPlan.getPredecessors(to)==null || !tezPlan.getPredecessors(to).contains(from)) {
-                        tezPlan.connect(from, to);
+                    if (storeSeen.containsKey(store)) {
+                        storeSeen.get(store).outputKeys.add(tezOp.getOperatorKey().toString());
+                    } else {
+                        POValueOutputTez output = new POValueOutputTez(OperatorKey.genOpKey(scope));
+                        output.addOutputKey(tezOp.getOperatorKey().toString());
+                        from.plan.remove(from.plan.getOperator(store.getOperatorKey()));
+                        from.plan.addAsLeaf(output);
+                        storeSeen.put(store, output);
                     }
+
+                    TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, from, tezOp);
+                    //TODO shared edge once support is available in Tez
+                    edge.dataMovementType = DataMovementType.BROADCAST;
+                    edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+                    edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
                 }
             }
         }
@@ -314,33 +308,66 @@ public class TezCompiler extends PhyPlan
                 }
 
                 PhysicalOperator p = predecessors.get(0);
-                TezOperator oper = null;
+                TezOperator storeTezOper = null;
                 if (p instanceof POStore) {
-                    oper = phyToTezOpMap.get(p);
+                    storeTezOper = phyToTezOpMap.get(p);
                 } else {
                     int errCode = 2126;
-                    String msg = "Predecessor of load should be a store. Got "+p.getClass();
+                    String msg = "Predecessor of load should be a store. Got " + p.getClass();
                     throw new PlanException(msg, errCode, PigException.BUG);
                 }
-
-                // Need new operator
+                PhysicalOperator store = storeTezOper.plan.getOperator(p.getOperatorKey());
+                // replace POStore to POValueOutputTez, convert the tezOperator to splitter
+                storeTezOper.plan.disconnect(storeTezOper.plan.getPredecessors(store).get(0), store);
+                storeTezOper.plan.remove(store);
+                POValueOutputTez valueOutput = new POValueOutputTez(new OperatorKey(scope,nig.getNextNodeId(scope)));
+                storeTezOper.plan.addAsLeaf(valueOutput);
+                storeTezOper.setSplitter(true);
+                
+                // Create a splittee of store only
+                TezOperator storeOnlyTezOperator = getTezOp();
+                PhysicalPlan storeOnlyPhyPlan = new PhysicalPlan();
+                POValueInputTez valueInput = new POValueInputTez(new OperatorKey(scope,nig.getNextNodeId(scope)));
+                valueInput.setInputKey(storeTezOper.getOperatorKey().toString());
+                storeOnlyPhyPlan.addAsLeaf(valueInput);
+                storeOnlyPhyPlan.addAsLeaf(store);
+                storeOnlyTezOperator.plan = storeOnlyPhyPlan;
+                tezPlan.add(storeOnlyTezOperator);
+                phyToTezOpMap.put(store, storeOnlyTezOperator);
+                
+                // Create new operator as second splittee
                 curTezOp = getTezOp();
-                curTezOp.plan.add(op);
+                POValueInputTez valueInput2 = new POValueInputTez(new OperatorKey(scope,nig.getNextNodeId(scope)));
+                valueInput2.setInputKey(storeTezOper.getOperatorKey().toString());
+                curTezOp.plan.add(valueInput2);
                 tezPlan.add(curTezOp);
 
-                plan.disconnect(op, p);
-                oper.segmentBelow = true;
-                tezPlan.connect(oper, curTezOp);
-                phyToTezOpMap.put(op, curTezOp);
+                // Connect splitter to splittee
+                TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, storeTezOper, storeOnlyTezOperator);
+                edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+                edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+                edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+                storeOnlyTezOperator.setRequestedParallelismByReference(storeTezOper);
+                
+                edge = TezCompilerUtil.connect(tezPlan, storeTezOper, curTezOp);
+                edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+                edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+                edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+                curTezOp.setRequestedParallelismByReference(storeTezOper);
+                
                 return;
             }
 
             Collections.sort(predecessors);
-            compiledInputs = new TezOperator[predecessors.size()];
-            int i = -1;
-            for (PhysicalOperator pred : predecessors) {
-                compile(pred);
-                compiledInputs[++i] = curTezOp;
+            if(op instanceof POSplit && splitsSeen.containsKey(op.getOperatorKey())){
+                // skip follow up POSplit
+            } else {
+                compiledInputs = new TezOperator[predecessors.size()];
+                int i = -1;
+                for (PhysicalOperator pred : predecessors) {
+                    compile(pred);
+                    compiledInputs[++i] = curTezOp;
+                }
             }
         } else {
             // No predecessors. Mostly a load. But this is where we start. We
@@ -365,20 +392,6 @@ public class TezCompiler extends PhyPlan
         compiledInputs = prevCompInp;
     }
 
-    /**
-     * Start a new TezOperator whose plan will be the sub-plan of POSplit
-     *
-     * @param splitOperatorKey
-     *            OperatorKey of the POSplit for which the new plan is a sub-plan
-     * @return the new TezOperator
-     * @throws PlanException
-     */
-    private TezOperator startNew(OperatorKey splitOperatorKey) throws PlanException {
-        TezOperator ret = getTezOp();
-        ret.setSplitOperatorKey(splitOperatorKey);
-        return ret;
-    }
-
     private void nonBlocking(PhysicalOperator op) throws PlanException, IOException {
         TezOperator tezOp;
         if (compiledInputs.length == 1) {
@@ -400,46 +413,11 @@ public class TezCompiler extends PhyPlan
         tezPlan.add(newTezOp);
         for (TezOperator tezOp : compiledInputs) {
             tezOp.setClosed(true);
-            handleSplitAndConnect(tezPlan, tezOp, newTezOp);
+            TezCompilerUtil.connect(tezPlan, tezOp, newTezOp);
         }
         curTezOp = newTezOp;
     }
 
-    private TezEdgeDescriptor handleSplitAndConnect(TezOperPlan tezPlan, TezOperator from, TezOperator to)
-            throws PlanException {
-        return handleSplitAndConnect(tezPlan, from, to, true);
-    }
-
-    private TezEdgeDescriptor handleSplitAndConnect(TezOperPlan tezPlan,
-            TezOperator from, TezOperator to, boolean addToSplitPlan)
-            throws PlanException {
-        // Add edge descriptors from POLocalRearrange in POSplit
-        // sub-plan to new operators
-        PhysicalOperator leaf = from.plan.getLeaves().get(0);
-        // It could be POStoreTez incase of sampling job in order by
-        if (leaf instanceof POLocalRearrangeTez) {
-            POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
-            lr.setOutputKey(to.getOperatorKey().toString());
-        }
-        TezEdgeDescriptor edge = null;
-        if (from.isSplitSubPlan()) {
-            TezOperator splitOp = splitsSeen.get(from.getSplitOperatorKey());
-            if (addToSplitPlan) {
-                // Set inputs to null as POSplit will attach input to roots
-                for (PhysicalOperator root : from.plan.getRoots()) {
-                    root.setInputs(null);
-                }
-                POSplit split = findPOSplit(splitOp, from.getSplitOperatorKey());
-                split.addPlan(from.plan);
-                addSubPlanPropertiesToParent(splitOp, curTezOp);
-            }
-            edge = TezCompilerUtil.connect(tezPlan, splitOp, to);
-        } else {
-            edge = TezCompilerUtil.connect(tezPlan, from, to);
-        }
-        return edge;
-    }
-
     private POSplit findPOSplit(TezOperator tezOp, OperatorKey splitKey)
             throws PlanException {
         POSplit split = (POSplit) tezOp.plan.getOperator(splitKey);
@@ -473,105 +451,6 @@ public class TezCompiler extends PhyPlan
     }
 
     /**
-     * Remove the operator and the whole tree connected to that operator from
-     * the plan. Only remove corresponding connected sub-plan if you encounter
-     * another Split operator in the predecessor.
-     *
-     * @param op Operator to remove
-     * @throws VisitorException
-     */
-    private void removeDupOpTreeOfSplit(TezOperPlan plan, TezOperator op, boolean isMultiQuery)
-            throws VisitorException {
-        Stack<TezOperator> stack = new Stack<TezOperator>();
-        stack.push(op);
-        while (!stack.isEmpty()) {
-            op = stack.pop();
-            List<TezOperator> predecessors = plan.getPredecessors(op);
-            if (predecessors != null) {
-                if (isMultiQuery) {
-                    for (TezOperator pred : predecessors) {
-                        if (!pred.isSplitOperator()) {
-                            stack.push(pred);
-                        } else {
-                            List<POSplit> splits = PlanHelper.getPhysicalOperators(
-                                    pred.plan, POSplit.class);
-                            for (POSplit split : splits) {
-                                PhysicalPlan planToRemove = null;
-                                for (PhysicalPlan splitPlan : split.getPlans()) {
-                                    PhysicalOperator phyOp = splitPlan
-                                            .getLeaves().get(0);
-                                    if (phyOp instanceof POLocalRearrangeTez) {
-                                        POLocalRearrangeTez lr = (POLocalRearrangeTez) phyOp;
-                                        if (lr.getOutputKey().equals(
-                                                op.getOperatorKey().toString())) {
-                                            planToRemove = splitPlan;
-                                            break;
-                                        }
-                                    }
-                                }
-                                if (planToRemove != null) {
-                                    split.getPlans().remove(planToRemove);
-                                    break;
-                                }
-                            }
-                        }
-                    }
-                } else {
-                    for (TezOperator pred : predecessors) {
-                        // Remove everything till we encounter another split
-                        if (!pred.isSplitOperator()) {
-                            stack.push(pred);
-                        } else {
-                            // If split operator, just remove from the output
-                            POValueOutputTez valueOut = (POValueOutputTez)pred.plan.getLeaves().get(0);
-                            valueOut.removeOutputKey(op.getOperatorKey().toString());
-                            //TODO Handle shared edge when available in Tez
-                            pred.outEdges.remove(op.getOperatorKey().toString());
-                        }
-                    }
-                }
-            }
-            plan.remove(op);
-        }
-    }
-
-    /**
-     * In case of mulitple levels of split, after removing duplicate tree we need to reset
-     * input of operators in the old tree as some of the inputs of the PhysicalOperator in
-     * original tree will now be overwritten and referring to operators in
-     * duplicate tree. For eg: POFilter inputs will refer to the duplicate tree's
-     * POValueInputTez even though it is connected to a original split tree's POValueInputTez
-     */
-    private void resetInputsOfPredecessors(TezOperPlan plan, TezOperator op) {
-        Stack<TezOperator> stack = new Stack<TezOperator>();
-        stack.push(op);
-        while (!stack.isEmpty()) {
-            op = stack.pop();
-            List<TezOperator> predecessors = plan.getPredecessors(op);
-            if (predecessors != null) {
-                for (TezOperator pred : predecessors) {
-                    resetInputs(pred.plan, pred.plan.getLeaves());
-                    if (!pred.isSplitOperator()) {
-                        stack.push(pred);
-                    }
-                }
-            }
-        }
-    }
-
-    private void resetInputs(PhysicalPlan plan, List<PhysicalOperator> ops) {
-        for (PhysicalOperator op : ops) {
-            List<PhysicalOperator> preds = plan.getPredecessors(op);
-            if (preds != null) {
-                for (PhysicalOperator pred : preds) {
-                    pred.setInputs(plan.getPredecessors(pred));
-                    resetInputs(plan, plan.getPredecessors(pred));
-                }
-            }
-        }
-    }
-
-    /**
      * Merges the TezOperators in the compiledInputs into a single merged
      * TezOperator.
      *
@@ -702,7 +581,7 @@ public class TezCompiler extends PhyPlan
     @Override
     public void visitDistinct(PODistinct op) throws VisitorException {
         try {
-            POLocalRearrange lr = localRearrangeFactory.create();
+            POLocalRearrangeTez lr = localRearrangeFactory.create();
             lr.setDistinct(true);
             lr.setAlias(op.getAlias());
             curTezOp.plan.addAsLeaf(lr);
@@ -751,23 +630,6 @@ public class TezCompiler extends PhyPlan
     @Override
     public void visitFilter(POFilter op) throws VisitorException {
         try {
-            if (curTezOp.isSplitSubPlan() || curTezOp.getSplitParent() != null) {
-                // Do not add the filter. Refer NoopFilterRemover.java of MR
-                PhysicalPlan filterPlan = op.getPlan();
-                if (filterPlan.size() == 1) {
-                    PhysicalOperator fp = filterPlan.getRoots().get(0);
-                    if (fp instanceof ConstantExpression) {
-                        ConstantExpression exp = (ConstantExpression)fp;
-                        Object value = exp.getValue();
-                        if (value instanceof Boolean) {
-                            Boolean filterValue = (Boolean)value;
-                            if (filterValue) {
-                                return;
-                            }
-                        }
-                    }
-                }
-            }
             nonBlocking(op);
             processUDFs(op.getPlan());
             phyToTezOpMap.put(op, curTezOp);
@@ -795,7 +657,7 @@ public class TezCompiler extends PhyPlan
                     lr.setOutputKey(curTezOp.getOperatorKey().toString());
 
                     tezOp.plan.addAsLeaf(lr);
-                    TezEdgeDescriptor edge = handleSplitAndConnect(tezPlan, tezOp, curTezOp);
+                    TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, tezOp, curTezOp);
                     if (tezOp.getSplitOperatorKey() != null) {
                         inputKeys.add(tezOp.getSplitOperatorKey().toString());
                     } else {
@@ -1983,7 +1845,7 @@ public class TezCompiler extends PhyPlan
             Pair<TezOperator, Integer> quantJobParallelismPair = getOrderbySamplingAggregationJob(op, rp);
             TezOperator[] sortOpers = getSortJobs(prevOper, lr, op, keyType, fields);
 
-            TezEdgeDescriptor edge = handleSplitAndConnect(tezPlan, prevOper, sortOpers[0]);
+            TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, prevOper, sortOpers[0]);
 
             // Use 1-1 edge
             edge.dataMovementType = DataMovementType.ONE_TO_ONE;
@@ -2003,7 +1865,7 @@ public class TezCompiler extends PhyPlan
             sortOpers[1].setRequestedParallelism(quantJobParallelismPair.second);
             */
 
-            handleSplitAndConnect(tezPlan, prevOper, quantJobParallelismPair.first, false);
+            TezCompilerUtil.connect(tezPlan, prevOper, quantJobParallelismPair.first);
             lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
             lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString());
 
@@ -2036,66 +1898,30 @@ public class TezCompiler extends PhyPlan
     @Override
     public void visitSplit(POSplit op) throws VisitorException {
         try {
-            boolean isMultiQuery = "true".equalsIgnoreCase(pigContext
-                    .getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
-
-            if (isMultiQuery) {
-                if (splitsSeen.containsKey(op.getOperatorKey())) {
-                    // Since the plan for this split already exists in the tez plan,
-                    // discard the hierarchy or tez operators we constructed so far
-                    // till we encountered the split in this tree
-                    removeDupOpTreeOfSplit(tezPlan, curTezOp, isMultiQuery);
-                    curTezOp = startNew(op.getOperatorKey());
-                } else {
-                    nonBlocking(op);
-                    if(curTezOp.isSplitSubPlan()) {
-                        // Split followed by another split
-                        // Set inputs to null as POSplit will attach input to roots
-                        for (PhysicalOperator root : curTezOp.plan.getRoots()) {
-                            root.setInputs(null);
-                        }
-                        TezOperator splitOp = splitsSeen.get(curTezOp.getSplitOperatorKey());
-                        POSplit split = findPOSplit(splitOp, curTezOp.getSplitOperatorKey());
-                        split.addPlan(curTezOp.plan);
-                        addSubPlanPropertiesToParent(splitOp, curTezOp);
-                        splitsSeen.put(op.getOperatorKey(), splitOp);
-                        phyToTezOpMap.put(op, splitOp);
-                    } else {
-                        curTezOp.setSplitOperator(true);
-                        splitsSeen.put(op.getOperatorKey(), curTezOp);
-                        phyToTezOpMap.put(op, curTezOp);
-                    }
-                    curTezOp = startNew(op.getOperatorKey());
-                }
+            TezOperator splitOp = curTezOp;
+            POValueOutputTez output = null;
+            if (splitsSeen.containsKey(op.getOperatorKey())) {
+                splitOp = splitsSeen.get(op.getOperatorKey());
+                output = (POValueOutputTez)splitOp.plan.getLeaves().get(0);
             } else {
-                TezOperator splitOp = curTezOp;
-                POValueOutputTez output = null;
-                if (splitsSeen.containsKey(op.getOperatorKey())) {
-                    removeDupOpTreeOfSplit(tezPlan, curTezOp, isMultiQuery);
-                    splitOp = splitsSeen.get(op.getOperatorKey());
-                    resetInputsOfPredecessors(tezPlan, splitOp);
-                    output = (POValueOutputTez)splitOp.plan.getLeaves().get(0);
-                } else {
-                    splitOp.setSplitOperator(true);
-                    splitsSeen.put(op.getOperatorKey(), splitOp);
-                    phyToTezOpMap.put(op, splitOp);
-                    output = new POValueOutputTez(OperatorKey.genOpKey(scope));
-                    splitOp.plan.addAsLeaf(output);
-                }
-                curTezOp = getTezOp();
-                curTezOp.setSplitParent(splitOp.getOperatorKey());
-                tezPlan.add(curTezOp);
-                output.addOutputKey(curTezOp.getOperatorKey().toString());
-                TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, splitOp, curTezOp);
-                //TODO shared edge once support is available in Tez
-                edge.dataMovementType = DataMovementType.ONE_TO_ONE;
-                edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
-                edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
-                curTezOp.setRequestedParallelismByReference(splitOp);
-                POValueInputTez input = new POValueInputTez(OperatorKey.genOpKey(scope));
-                input.setInputKey(splitOp.getOperatorKey().toString());
-                curTezOp.plan.addAsLeaf(input);
+                splitsSeen.put(op.getOperatorKey(), splitOp);
+                splitOp.setSplitter(true);
+                phyToTezOpMap.put(op, splitOp);
+                output = new POValueOutputTez(OperatorKey.genOpKey(scope));
+                splitOp.plan.addAsLeaf(output);
             }
+            curTezOp = getTezOp();
+            tezPlan.add(curTezOp);
+            output.addOutputKey(curTezOp.getOperatorKey().toString());
+            TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, splitOp, curTezOp);
+            //TODO shared edge once support is available in Tez
+            edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+            edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+            edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+            curTezOp.setRequestedParallelismByReference(splitOp);
+            POValueInputTez input = new POValueInputTez(OperatorKey.genOpKey(scope));
+            input.setInputKey(splitOp.getOperatorKey().toString());
+            curTezOp.plan.addAsLeaf(input);
         } catch (Exception e) {
             int errCode = 2034;
             String msg = "Error compiling operator "

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Mon Feb 17 08:26:52 2014
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -200,6 +201,24 @@ public class TezDagBuilder extends TezOp
                 break;
             }
         }
+        
+        List<POValueOutputTez> valueOutputs = PlanHelper.getPhysicalOperators(from.plan,
+                POValueOutputTez.class);
+        if (!valueOutputs.isEmpty()) {
+            POValueOutputTez valueOutput = valueOutputs.get(0);
+            for (String outputKey : valueOutput.outputKeys) {
+                if (outputKey.equals(to.getOperatorKey().toString())) {
+                    conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
+                            POValueOutputTez.EmptyWritable.class.getName());
+                    conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS,
+                            BinSedesTuple.class.getName());
+                    conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
+                            POValueOutputTez.EmptyWritable.class.getName());
+                    conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
+                            BinSedesTuple.class.getName());
+                }
+            }
+        }
 
         conf.setBoolean("mapred.mapper.new-api", true);
         conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
@@ -240,17 +259,6 @@ public class TezDagBuilder extends TezOp
                     edge.partitionerClass.getName());
         }
 
-        if (from.plan.getLeaves().get(0) instanceof POValueOutputTez) {
-            conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
-                    POValueOutputTez.EmptyWritable.class.getName());
-            conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS,
-                    BinSedesTuple.class.getName());
-            conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
-                    POValueOutputTez.EmptyWritable.class.getName());
-            conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
-                    BinSedesTuple.class.getName());
-        }
-
         MRToTezHelper.convertMRToTezRuntimeConf(conf, globalConf);
 
         in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
@@ -367,13 +375,22 @@ public class TezDagBuilder extends TezOp
             // Set input keys for POShuffleTezLoad. This is used to identify
             // the inputs that are attached to the POShuffleTezLoad in the
             // backend.
+            Map<Integer, String> localRearrangeMap = new TreeMap<Integer, String>();
             for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
                 if (tezOp.sampleOperator != null && tezOp.sampleOperator == pred) {
                     // skip sample vertex input
                 } else {
-                    newPack.addInputKey(pred.getOperatorKey().toString());
+                    LinkedList<POLocalRearrangeTez> lrs = PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
+                    for (POLocalRearrangeTez lr : lrs) {
+                        if (lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
+                            localRearrangeMap.put((int)lr.getIndex(), pred.getOperatorKey().toString());
+                        }
+                    }
                 }
             }
+            for (Map.Entry<Integer, String> entry : localRearrangeMap.entrySet()) {
+                newPack.addInputKey(entry.getValue());
+            }
 
             if (succsList != null) {
                 for (PhysicalOperator succs : succsList) {
@@ -383,8 +400,22 @@ public class TezDagBuilder extends TezOp
 
             setIntermediateInputKeyValue(pack.getPkgr().getKeyType(), payloadConf,
                     tezOp);
+        } else if (roots.size() == 1 && roots.get(0) instanceof POIdentityInOutTez) {
+            POIdentityInOutTez identityInOut = (POIdentityInOutTez) roots.get(0);
+            // TODO Need to fix multiple input key mapping
+            TezOperator identityInOutPred = null;
+            for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
+                if (!pred.isSampler()) {
+                    identityInOutPred = pred;
+                    break;
+                }
+            }
+            identityInOut.setInputKey(identityInOutPred.getOperatorKey().toString());
+        } else if (roots.size() == 1 && roots.get(0) instanceof POValueInputTez) {
+            POValueInputTez valueInput = (POValueInputTez) roots.get(0);
+            TezOperator pred = mPlan.getPredecessors(tezOp).get(0);
+            valueInput.setInputKey(pred.getOperatorKey().toString());
         }
-
         payloadConf.setClass("mapreduce.outputformat.class",
                 PigOutputFormat.class, OutputFormat.class);
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Mon Feb 17 08:26:52 2014
@@ -168,6 +168,16 @@ public class TezLauncher extends Launche
             SecondaryKeyOptimizerTez skOptimizer = new SecondaryKeyOptimizerTez(tezPlan);
             skOptimizer.visit();
         }
+        
+        boolean isMultiQuery =
+                "true".equalsIgnoreCase(pc.getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
+
+        if (isMultiQuery) {
+            // reduces the number of TezOpers in the Tez plan generated
+            // by multi-query (multi-store) script.
+            MultiQueryOptimizerTez mqOptimizer = new MultiQueryOptimizerTez(tezPlan);
+            mqOptimizer.visit();
+        }
 
         // Run AccumulatorOptimizer on Tez plan
         boolean isAccum = Boolean.parseBoolean(pc.getProperties().getProperty(

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Mon Feb 17 08:26:52 2014
@@ -71,12 +71,8 @@ public class TezOperator extends Operato
     // Only POStore or POLocalRearrange leaf can be a sub-plan of POSplit
     private OperatorKey splitOperatorKey = null;
 
-    // This indicates that this TezOper has POSplit as a parent.
-    // This is the case where multi-query is turned off.
-    private OperatorKey splitParent = null;
-
     // This indicates that this TezOper is a split operator
-    private boolean isSplitOper;
+    private boolean splitter;
 
     // Indicates that the plan creation is complete
     boolean closed = false;
@@ -179,24 +175,12 @@ public class TezOperator extends Operato
         this.splitOperatorKey = splitOperatorKey;
     }
 
-    public boolean isSplitSubPlan() {
-        return splitOperatorKey != null;
-    }
-
-    public OperatorKey getSplitParent() {
-        return splitParent;
-    }
-
-    public void setSplitParent(OperatorKey splitParent) {
-        this.splitParent = splitParent;
-    }
-
-    public boolean isSplitOperator() {
-        return isSplitOper;
+    public void setSplitter(boolean spl) {
+        splitter = spl;
     }
 
-    public void setSplitOperator(boolean isSplitOperator) {
-        this.isSplitOper = isSplitOperator;
+    public boolean isSplitter() {
+        return splitter;
     }
 
     public boolean isClosed() {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Mon Feb 17 08:26:52 2014
@@ -3,9 +3,11 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map.Entry;
 
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
@@ -76,12 +78,31 @@ public class TezCompilerUtil {
 
     static public TezEdgeDescriptor connect(TezOperPlan plan, TezOperator from, TezOperator to) throws PlanException {
         plan.connect(from, to);
+        PhysicalOperator leaf = from.plan.getLeaves().get(0);
+        // It could be POStoreTez incase of sampling job in order by
+        if (leaf instanceof POLocalRearrangeTez) {
+            POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
+            lr.setOutputKey(to.getOperatorKey().toString());
+        }
         // Add edge descriptors to old and new operators
         TezEdgeDescriptor edge = new TezEdgeDescriptor();
         to.inEdges.put(from.getOperatorKey(), edge);
         from.outEdges.put(to.getOperatorKey(), edge);
         return edge;
     }
+    
+    static public void connect(TezOperPlan plan, TezOperator from, TezOperator to, TezEdgeDescriptor edge) throws PlanException {
+        plan.connect(from, to);
+        PhysicalOperator leaf = from.plan.getLeaves().get(0);
+        // It could be POStoreTez incase of sampling job in order by
+        if (leaf instanceof POLocalRearrangeTez) {
+            POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
+            lr.setOutputKey(to.getOperatorKey().toString());
+        }
+        // Add edge descriptors to old and new operators
+        to.inEdges.put(from.getOperatorKey(), edge);
+        from.outEdges.put(to.getOperatorKey(), edge);
+    }
 
     static public POForEach getForEach(POProject project, int rp, String scope, NodeIdGenerator nig) {
         PhysicalPlan forEachPlan = new PhysicalPlan();

Added: pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java?rev=1568901&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java Mon Feb 17 08:26:52 2014
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezLoad;
+import org.apache.pig.data.Tuple;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.broadcast.input.BroadcastKVReader;
+
+public class ReadScalarsTez extends EvalFunc<Object> implements TezLoad {
+    private static final Log LOG = LogFactory.getLog(ReadScalarsTez.class);
+    String inputKey;
+    Tuple t;
+    public ReadScalarsTez(String inputKey) {
+        this.inputKey = inputKey;
+    }
+    public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf)
+            throws ExecException {
+        LogicalInput input = inputs.get(inputKey);
+        if (input == null) {
+            throw new ExecException("Input from vertex " + inputKey + " is missing");
+        }
+        try {
+            BroadcastKVReader reader = (BroadcastKVReader)input.getReader();
+            reader.next();
+            t = (Tuple)reader.getCurrentValue();
+            LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public Object exec(Tuple input) throws IOException {
+        int pos = (Integer)input.get(0);
+        Object obj = t.get(pos);
+        return obj;
+    }
+}

Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java Mon Feb 17 08:26:52 2014
@@ -98,9 +98,9 @@ public class ScalarVisitor extends AllEx
                     store.setTmpStore(true);
                     lp.add( store );
                     lp.connect( refOp, store );
-                    expr.setImplicitReferencedOperator(store);
                 }
-
+                
+                expr.setImplicitReferencedOperator(store);
                 filenameConst.setValue( store.getOutputSpec().getFileName() );
                 
                 if( lp.getSoftLinkSuccessors( store ) == null ||