You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 03:34:40 UTC

svn commit: r1784224 [6/17] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Fri Feb 24 03:34:37 2017
@@ -32,12 +32,10 @@ import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.hash.Hash;
 import org.apache.pig.CollectableLoadFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.IndexableLoadFunc;
@@ -46,10 +44,8 @@ import org.apache.pig.OrderedLoadFunc;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -86,10 +82,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBloomFilterRearrangeTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterStatsTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez;
@@ -117,7 +110,6 @@ import org.apache.pig.impl.builtin.GetMe
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.NullableIntWritable;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.Operator;
@@ -175,10 +167,6 @@ public class TezCompiler extends PhyPlan
 
     private Map<PhysicalOperator, TezOperator> phyToTezOpMap;
 
-    // Contains the inputs to operator like join, with the list maintaining the
-    // same order of join from left to right
-    private Map<TezOperator, List<TezOperator>> inputsMap;
-
     public static final String USER_COMPARATOR_MARKER = "user.comparator.func:";
     public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold";
     public static final String OPTIMISTIC_FILE_CONCATENATION = "pig.optimistic.files.concatenation";
@@ -187,8 +175,6 @@ public class TezCompiler extends PhyPlan
     private boolean optimisticFileConcatenation = false;
     private List<String> readOnceLoadFuncs = null;
 
-    private Configuration conf;
-
     private POLocalRearrangeTezFactory localRearrangeFactory;
 
     public TezCompiler(PhysicalPlan plan, PigContext pigContext)
@@ -198,7 +184,6 @@ public class TezCompiler extends PhyPlan
         this.pigContext = pigContext;
 
         pigProperties = pigContext.getProperties();
-        conf = ConfigurationUtil.toConfiguration(pigProperties, false);
         splitsSeen = Maps.newHashMap();
         tezPlan = new TezOperPlan();
         nig = NodeIdGenerator.getGenerator();
@@ -212,7 +197,6 @@ public class TezCompiler extends PhyPlan
         scope = roots.get(0).getOperatorKey().getScope();
         localRearrangeFactory = new POLocalRearrangeTezFactory(scope, nig);
         phyToTezOpMap = Maps.newHashMap();
-        inputsMap = Maps.newHashMap();
 
         fileConcatenationThreshold = Integer.parseInt(pigProperties
                 .getProperty(FILE_CONCATENATION_THRESHOLD, "100"));
@@ -671,8 +655,15 @@ public class TezCompiler extends PhyPlan
             blocking();
             TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp);
 
-            TezEdgeDescriptor edge = curTezOp.inEdges.get(lastOp.getOperatorKey());
-            edge.setNeedsDistinctCombiner(true);
+            // Add the DISTINCT plan as the combine plan. In MR Pig, the combiner is implemented
+            // with a global variable and a specific DistinctCombiner class. This seems better.
+            PhysicalPlan combinePlan = curTezOp.inEdges.get(lastOp.getOperatorKey()).combinePlan;
+            addDistinctPlan(combinePlan, 1);
+
+            POLocalRearrangeTez clr = localRearrangeFactory.create();
+            clr.setOutputKey(curTezOp.getOperatorKey().toString());
+            clr.setDistinct(true);
+            combinePlan.addAsLeaf(clr);
 
             curTezOp.markDistinct();
             addDistinctPlan(curTezOp.plan, op.getRequestedParallelism());
@@ -865,7 +856,6 @@ public class TezCompiler extends PhyPlan
             } else {
                 curTezOp.plan.addAsLeaf(op);
             }
-            phyToTezOpMap.put(op, curTezOp);
 
         } catch (Exception e) {
             int errCode = 2034;
@@ -910,7 +900,6 @@ public class TezCompiler extends PhyPlan
     public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException {
         try {
             blocking();
-            inputsMap.put(curTezOp, new ArrayList<>(Arrays.asList(compiledInputs)));
             TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp);
             curTezOp.setRequestedParallelism(op.getRequestedParallelism());
             if (op.isCross()) {
@@ -1099,7 +1088,7 @@ public class TezCompiler extends PhyPlan
         indexerTezOp.setDontEstimateParallelism(true);
 
         POStore st = TezCompilerUtil.getStore(scope, nig);
-        FileSpec strFile = getTempFileSpec(pigContext);
+        FileSpec strFile = getTempFileSpec();
         st.setSFile(strFile);
         indexAggrOper.plan.addAsLeaf(st);
         indexAggrOper.setClosed(true);
@@ -1266,7 +1255,7 @@ public class TezCompiler extends PhyPlan
                 rightTezOprAggr.setDontEstimateParallelism(true);
 
                 POStore st = TezCompilerUtil.getStore(scope, nig);
-                FileSpec strFile = getTempFileSpec(pigContext);
+                FileSpec strFile = getTempFileSpec();
                 st.setSFile(strFile);
                 rightTezOprAggr.plan.addAsLeaf(st);
                 rightTezOprAggr.setClosed(true);
@@ -1357,9 +1346,6 @@ public class TezCompiler extends PhyPlan
                 } else if (op.getNumInps() > 1) {
                     curTezOp.markCogroup();
                 }
-            } else if (op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) {
-                curTezOp.markRegularJoin();
-                addBloomToJoin(op, curTezOp);
             }
         } catch (Exception e) {
             int errCode = 2034;
@@ -1368,132 +1354,6 @@ public class TezCompiler extends PhyPlan
         }
     }
 
-    private void addBloomToJoin(POPackage op, TezOperator curTezOp) throws PlanException {
-
-        List<TezOperator> inputs = inputsMap.get(curTezOp);
-        TezOperator buildBloomOp;
-        List<TezOperator> applyBloomOps = new ArrayList<>();
-
-        String strategy = conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, POBuildBloomRearrangeTez.DEFAULT_BLOOM_STRATEGY);
-        boolean createBloomInMap = "map".equals(strategy);
-        if (!createBloomInMap && !strategy.equals("reduce")) {
-            throw new PlanException(new IllegalArgumentException(
-                    "Invalid value for "
-                            + PigConfiguration.PIG_BLOOMJOIN_STRATEGY + " -  "
-                            + strategy + ". Valid values are map and reduce"));
-        }
-        int numHash = conf.getInt(PigConfiguration.PIG_BLOOMJOIN_HASH_FUNCTIONS, POBuildBloomRearrangeTez.DEFAULT_NUM_BLOOM_HASH_FUNCTIONS);
-        int vectorSizeBytes =  conf.getInt(PigConfiguration.PIG_BLOOMJOIN_VECTORSIZE_BYTES, POBuildBloomRearrangeTez.DEFAULT_BLOOM_VECTOR_SIZE_BYTES);
-        int numBloomFilters = POBuildBloomRearrangeTez.getNumBloomFilters(conf);
-        int hashType = Hash.parseHashType(conf.get(PigConfiguration.PIG_BLOOMJOIN_HASH_TYPE, POBuildBloomRearrangeTez.DEFAULT_BLOOM_HASH_TYPE));
-
-        // We build bloom of the right most input and apply the bloom filter on the left inputs by default.
-        // But in case of left outer join we build bloom of the left input and use it on the right input
-        boolean[] inner = op.getPkgr().getInner();
-        boolean skipNullKeys = true;
-        if (inner[inner.length - 1]) {  // inner has from right to left while inputs has from left to right
-            buildBloomOp = inputs.get(inputs.size() - 1); // Bloom filter is built from right most input
-            for (int i = 0; i < (inner.length - 1); i++) {
-                applyBloomOps.add(inputs.get(i));
-            }
-            skipNullKeys = inner[0];
-        } else {
-            // Left outer join
-            skipNullKeys = false;
-            buildBloomOp = inputs.get(0); // Bloom filter is built from left most input
-            for (int i = 1; i < inner.length; i++) {
-                applyBloomOps.add(inputs.get(i));
-            }
-        }
-
-        // Add BuildBloom operator to the input
-        POLocalRearrangeTez lr = (POLocalRearrangeTez) buildBloomOp.plan.getLeaves().get(0);
-        POBuildBloomRearrangeTez bbr = new POBuildBloomRearrangeTez(lr, createBloomInMap, numBloomFilters, vectorSizeBytes, numHash, hashType);
-        bbr.setSkipNullKeys(skipNullKeys);
-        buildBloomOp.plan.remove(lr);
-        buildBloomOp.plan.addAsLeaf(bbr);
-
-        // Add a new reduce vertex that will construct the final bloom filter
-        //    - by combining the bloom filters from the buildBloomOp input tasks in the map strategy
-        //    - or directly from the keys from the buildBloomOp input tasks in the reduce strategy
-        TezOperator combineBloomOp = getTezOp();
-        tezPlan.add(combineBloomOp);
-        combineBloomOp.markBuildBloom();
-        // Explicitly set the parallelism for the new vertex to number of bloom filters.
-        // Auto parallelism will bring it down based on the actual output size
-        combineBloomOp.setEstimatedParallelism(numBloomFilters);
-        // We don't want parallelism to be changed during the run by grace auto parallelism
-        // It will take the whole input size and estimate way higher
-        combineBloomOp.setDontEstimateParallelism(true);
-
-        String combineBloomOpKey = combineBloomOp.getOperatorKey().toString();
-        TezEdgeDescriptor edge = new TezEdgeDescriptor();
-        TezCompilerUtil.connect(tezPlan, buildBloomOp, combineBloomOp, edge);
-        bbr.setBloomOutputKey(combineBloomOpKey);
-
-
-        POPackage pkg = new POPackage(OperatorKey.genOpKey(scope));
-        pkg.setNumInps(1);
-        BloomPackager pkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);;
-        pkgr.setKeyType(DataType.INTEGER);
-        pkg.setPkgr(pkgr);
-        POValueOutputTez combineBloomOutput = new POValueOutputTez(OperatorKey.genOpKey(scope));
-        combineBloomOp.plan.addAsLeaf(pkg);
-        combineBloomOp.plan.addAsLeaf(combineBloomOutput);
-
-        edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
-        edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
-
-        // Add combiner as well.
-        POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
-        BloomPackager combinerPkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);
-        combinerPkgr.setCombiner(true);
-        combinerPkgr.setKeyType(DataType.INTEGER);
-        pkg_c.setPkgr(combinerPkgr);
-        pkg_c.setNumInps(1);
-        edge.combinePlan.addAsLeaf(pkg_c);
-        POProject prjKey = new POProject(OperatorKey.genOpKey(scope));
-        prjKey.setResultType(DataType.INTEGER);
-        List<PhysicalPlan> clrInps = new ArrayList<PhysicalPlan>();
-        PhysicalPlan pp = new PhysicalPlan();
-        pp.add(prjKey);
-        clrInps.add(pp);
-        POLocalRearrangeTez clr = localRearrangeFactory.create(0, LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER);
-        clr.setOutputKey(combineBloomOpKey);
-        edge.combinePlan.addAsLeaf(clr);
-
-        if (createBloomInMap) {
-            // No combiner needed on map as there will be only one bloom filter per map for each partition
-            // In the reducer, the bloom filters will be combined with same logic of reduce in BloomPackager
-            edge.setCombinerInMap(false);
-            edge.setCombinerInReducer(true);
-        } else {
-            pkgr.setBloomKeyType(op.getPkgr().getKeyType());
-            // Do distinct of the keys on the map side to reduce data sent to reducers.
-            // In case of reduce, not adding a combiner and doing the distinct during reduce itself.
-            // If needed one can be added later
-            edge.setCombinerInMap(true);
-            edge.setCombinerInReducer(false);
-        }
-
-        // Broadcast the final bloom filter to other inputs
-        for (TezOperator applyBloomOp : applyBloomOps) {
-            applyBloomOp.markFilterBloom();
-            lr = (POLocalRearrangeTez) applyBloomOp.plan.getLeaves().get(0);
-            POBloomFilterRearrangeTez bfr = new POBloomFilterRearrangeTez(lr, numBloomFilters);
-            applyBloomOp.plan.remove(lr);
-            applyBloomOp.plan.addAsLeaf(bfr);
-            bfr.setInputKey(combineBloomOpKey);
-            edge = new TezEdgeDescriptor();
-            edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
-            edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
-            TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
-            TezCompilerUtil.connect(tezPlan, combineBloomOp, applyBloomOp, edge);
-            combineBloomOutput.addOutputKey(applyBloomOp.getOperatorKey().toString());
-        }
-
-    }
-
     @Override
     public void visitPOForEach(POForEach op) throws VisitorException{
         try{
@@ -1653,7 +1513,7 @@ public class TezCompiler extends PhyPlan
 
             for (int i=0; i<transformPlans.size(); i++) {
                 eps1.add(transformPlans.get(i));
-                flat1.add(i == transformPlans.size() - 1 ? true : false);
+                flat1.add(true);
             }
 
             // This foreach will pick the sort key columns from the POPoissonSample output
@@ -1862,7 +1722,7 @@ public class TezCompiler extends PhyPlan
      * @return
      * @throws IOException
      */
-    public static FileSpec getTempFileSpec(PigContext pigContext) throws IOException {
+    private FileSpec getTempFileSpec() throws IOException {
         return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
                 new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java Fri Feb 24 03:34:37 2017
@@ -31,13 +31,8 @@ import org.apache.tez.runtime.library.ou
  * Descriptor for Tez edge. It holds combine plan as well as edge properties.
  */
 public class TezEdgeDescriptor implements Serializable {
-
-    public transient PhysicalPlan combinePlan;
-    private boolean needsDistinctCombiner;
-    // Combiner runs on both input and output of Tez edge by default
-    // It can be configured to run only in output(map) or input(reduce)
-    private Boolean combinerInMap;
-    private Boolean combinerInReducer;
+    // Combiner runs on both input and output of Tez edge.
+    transient public PhysicalPlan combinePlan;
 
     public String inputClassName;
     public String outputClassName;
@@ -70,30 +65,6 @@ public class TezEdgeDescriptor implement
         dataMovementType = DataMovementType.SCATTER_GATHER;
     }
 
-    public boolean needsDistinctCombiner() {
-        return needsDistinctCombiner;
-    }
-
-    public void setNeedsDistinctCombiner(boolean nic) {
-        needsDistinctCombiner = nic;
-    }
-
-    public Boolean getCombinerInMap() {
-        return combinerInMap;
-    }
-
-    public void setCombinerInMap(Boolean combinerInMap) {
-        this.combinerInMap = combinerInMap;
-    }
-
-    public Boolean getCombinerInReducer() {
-        return combinerInReducer;
-    }
-
-    public void setCombinerInReducer(Boolean combinerInReducer) {
-        this.combinerInReducer = combinerInReducer;
-    }
-
     public boolean isUseSecondaryKey() {
         return useSecondaryKey;
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java Fri Feb 24 03:34:37 2017
@@ -25,9 +25,8 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashSet;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -218,12 +217,8 @@ public class TezOperPlan extends Operato
             newPlan.add(node);
         }
 
-        // Using a LinkedHashSet and doing a sort so that
-        // test plan printed remains same between jdk7 and jdk8
-        Set<Pair<TezOperator, TezOperator>> toReconnect = new LinkedHashSet<Pair<TezOperator, TezOperator>>();
-        List<TezOperator> fromEdges = new ArrayList<>(mFromEdges.keySet());
-        Collections.sort(fromEdges);
-        for (TezOperator from : fromEdges) {
+        Set<Pair<TezOperator, TezOperator>> toReconnect = new HashSet<Pair<TezOperator, TezOperator>>();
+        for (TezOperator from : mFromEdges.keySet()) {
             List<TezOperator> tos = mFromEdges.get(from);
             for (TezOperator to : tos) {
                 if (list.contains(from) || list.contains(to)) {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Fri Feb 24 03:34:37 2017
@@ -181,11 +181,7 @@ public class TezOperator extends Operato
         // Indicate if this job is a native job
         NATIVE,
         // Indicate if this job does rank counter
-        RANK_COUNTER,
-        // Indicate if this job constructs bloom filter
-        BUILDBLOOM,
-        // Indicate if this job applies bloom filter
-        FILTERBLOOM;
+        RANK_COUNTER;
     };
 
     // Features in the job/vertex. Mostly will be only one feature.
@@ -239,7 +235,6 @@ public class TezOperator extends Operato
     }
 
     private LoaderInfo loaderInfo = new LoaderInfo();
-    private long totalInputFilesSize = -1;
 
     public TezOperator(OperatorKey k) {
         super(k);
@@ -457,22 +452,6 @@ public class TezOperator extends Operato
         feature.set(OPER_FEATURE.RANK_COUNTER.ordinal());
     }
 
-    public boolean isBuildBloom() {
-        return feature.get(OPER_FEATURE.BUILDBLOOM.ordinal());
-    }
-
-    public void markBuildBloom() {
-        feature.set(OPER_FEATURE.BUILDBLOOM.ordinal());
-    }
-
-    public boolean isFilterBloom() {
-        return feature.get(OPER_FEATURE.FILTERBLOOM.ordinal());
-    }
-
-    public void markFilterBloom() {
-        feature.set(OPER_FEATURE.FILTERBLOOM.ordinal());
-    }
-
     public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE> excludeFeatures) {
         for (OPER_FEATURE opf : OPER_FEATURE.values()) {
             if (excludeFeatures != null && excludeFeatures.contains(opf)) {
@@ -672,14 +651,6 @@ public class TezOperator extends Operato
         return loaderInfo;
     }
 
-    public long getTotalInputFilesSize() {
-        return totalInputFilesSize;
-    }
-
-    public void setTotalInputFilesSize(long totalInputFilesSize) {
-        this.totalInputFilesSize = totalInputFilesSize;
-    }
-
     public void setUseGraceParallelism(boolean useGraceParallelism) {
         this.useGraceParallelism = useGraceParallelism;
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java Fri Feb 24 03:34:37 2017
@@ -31,7 +31,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -162,7 +161,7 @@ public class TezPOPackageAnnotator exten
         @Override
         public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
             POLocalRearrangeTez lr = (POLocalRearrangeTez) lrearrange;
-            if (!(lr.isConnectedToPackage() && lr.containsOutputKey(pkgTezOp.getOperatorKey().toString()))) {
+            if (!(lr.isConnectedToPackage() && lr.getOutputKey().equals(pkgTezOp.getOperatorKey().toString()))) {
                 return;
             }
             loRearrangeFound++;
@@ -181,9 +180,7 @@ public class TezPOPackageAnnotator exten
             if(keyInfo == null)
                 keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
 
-            // For BloomPackager there is only one input, but the
-            // POBuildBloomRearrangeTez index is that of the join's index and can be non-zero
-            Integer index = (pkg.getPkgr() instanceof BloomPackager) ? 0 : Integer.valueOf(lrearrange.getIndex());
+            Integer index = Integer.valueOf(lrearrange.getIndex());
             if(keyInfo.get(index) != null) {
                 if (isPOSplit) {
                     // Case of POSplit having more than one input in case of self join or union
@@ -200,20 +197,12 @@ public class TezPOPackageAnnotator exten
 
             }
 
-            if (pkg.getPkgr() instanceof BloomPackager ) {
-                keyInfo.put(index,
-                        new Pair<Boolean, Map<Integer, Integer>>(
-                                Boolean.FALSE, new HashMap<Integer, Integer>()));
-                pkg.getPkgr().setKeyInfo(keyInfo);
-            } else {
-                keyInfo.put(index,
-                        new Pair<Boolean, Map<Integer, Integer>>(
-                                lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
-                pkg.getPkgr().setKeyInfo(keyInfo);
-                pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
-                pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
-            }
-
+            keyInfo.put(index,
+                    new Pair<Boolean, Map<Integer, Integer>>(
+                            lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
+            pkg.getPkgr().setKeyInfo(keyInfo);
+            pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+            pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
         }
 
         /**

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java Fri Feb 24 03:34:37 2017
@@ -29,14 +29,9 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
-import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.PlanException;
@@ -165,178 +160,100 @@ public class TezPlanContainer extends Op
             return;
         }
 
-        List<TezOperator> opersToSegment = null;
+        TezOperator operToSegment = null;
+        List<TezOperator> succs = new ArrayList<TezOperator>();
         try {
             // Split top down from root to leaves
-            // Get list of operators closer to the root that can be segmented together
-            FirstLevelSegmentOperatorsFinder finder = new FirstLevelSegmentOperatorsFinder(tezOperPlan);
+            SegmentOperatorFinder finder = new SegmentOperatorFinder(tezOperPlan);
             finder.visit();
-            opersToSegment = finder.getOperatorsToSegment();
+            operToSegment = finder.getOperatorToSegment();
         } catch (VisitorException e) {
             throw new PlanException(e);
         }
-        if (!opersToSegment.isEmpty()) {
-            Set<TezOperator> commonSplitterPredecessors = new HashSet<>();
-            for (TezOperator operToSegment : opersToSegment) {
-                for (TezOperator succ : tezOperPlan.getSuccessors(operToSegment)) {
-                    commonSplitterPredecessors
-                            .addAll(getCommonSplitterPredecessors(tezOperPlan,
-                                    operToSegment, succ));
-                }
-            }
 
-            if (commonSplitterPredecessors.isEmpty()) {
-                List<TezOperator> allSuccs = new ArrayList<TezOperator>();
-                // Disconnect all the successors and move them to a new plan
-                for (TezOperator operToSegment : opersToSegment) {
-                    List<TezOperator> succs = new ArrayList<TezOperator>();
-                    succs.addAll(tezOperPlan.getSuccessors(operToSegment));
-                    allSuccs.addAll(succs);
-                    for (TezOperator succ : succs) {
-                        tezOperPlan.disconnect(operToSegment, succ);
+        if (operToSegment != null && tezOperPlan.getSuccessors(operToSegment) != null) {
+            succs.addAll(tezOperPlan.getSuccessors(operToSegment));
+            for (TezOperator succ : succs) {
+                tezOperPlan.disconnect(operToSegment, succ);
+            }
+            for (TezOperator succ : succs) {
+                try {
+                    if (tezOperPlan.getOperator(succ.getOperatorKey()) == null) {
+                        // Has already been moved to a new plan by previous successor
+                        // as part of dependency. It could have been further split.
+                        // So walk the full plan to find the new plan and connect
+                        TezOperatorFinder finder = new TezOperatorFinder(this, succ);
+                        finder.visit();
+                        connect(planNode, finder.getPlanContainerNode());
+                        continue;
                     }
-                }
-                TezOperPlan newOperPlan = new TezOperPlan();
-                for (TezOperator succ : allSuccs) {
+                    TezOperPlan newOperPlan = new TezOperPlan();
                     tezOperPlan.moveTree(succ, newOperPlan);
-                }
-                TezPlanContainerNode newPlanNode = new TezPlanContainerNode(
-                        generateNodeOperatorKey(), newOperPlan);
-                add(newPlanNode);
-                connect(planNode, newPlanNode);
-                split(newPlanNode);
-            } else {
-                // If there is a common splitter predecessor between operToSegment and the successor,
-                // we have to separate out that split to be able to segment.
-                // So we store the output of split to a temp store and then change the
-                // splittees to load from it.
-                String scope = opersToSegment.get(0).getOperatorKey().getScope();
-                for (TezOperator splitter : commonSplitterPredecessors) {
-                    try {
-                        List<TezOperator> succs = new ArrayList<TezOperator>();
-                        succs.addAll(tezOperPlan.getSuccessors(splitter));
-                        FileSpec fileSpec = TezCompiler.getTempFileSpec(pigContext);
-                        POStore tmpStore = getTmpStore(scope, fileSpec);
-                        // Replace POValueOutputTez with POStore
-                        splitter.plan.remove(splitter.plan.getLeaves().get(0));
-                        splitter.plan.addAsLeaf(tmpStore);
-                        splitter.segmentBelow = true;
-                        splitter.setSplitter(false);
-                        for (TezOperator succ : succs) {
-                            // Replace POValueInputTez with POLoad
-                            POLoad tmpLoad = getTmpLoad(scope, fileSpec);
-                            succ.plan.replace(succ.plan.getRoots().get(0), tmpLoad);
-                        }
-                    } catch (Exception e) {
-                        throw new PlanException(e);
+                    TezPlanContainerNode newPlanNode = new TezPlanContainerNode(
+                            generateNodeOperatorKey(), newOperPlan);
+                    add(newPlanNode);
+                    connect(planNode, newPlanNode);
+                    split(newPlanNode);
+                    if (newPlanNode.getTezOperPlan().getOperator(succ.getOperatorKey()) == null) {
+                        // On further split, the successor moved to a new plan container.
+                        // Connect to that
+                        TezOperatorFinder finder = new TezOperatorFinder(this, succ);
+                        finder.visit();
+                        disconnect(planNode, newPlanNode);
+                        connect(planNode, finder.getPlanContainerNode());
                     }
+                } catch (VisitorException e) {
+                    throw new PlanException(e);
                 }
             }
             split(planNode);
         }
     }
 
-    private static class FirstLevelSegmentOperatorsFinder extends TezOpPlanVisitor {
+    private static class SegmentOperatorFinder extends TezOpPlanVisitor {
 
-        private List<TezOperator> opersToSegment = new ArrayList<>();
+        private TezOperator operToSegment;
 
-        public FirstLevelSegmentOperatorsFinder(TezOperPlan plan) {
+        public SegmentOperatorFinder(TezOperPlan plan) {
             super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
         }
 
-        public List<TezOperator> getOperatorsToSegment() {
-            return opersToSegment;
+        public TezOperator getOperatorToSegment() {
+            return operToSegment;
         }
 
         @Override
-        public void visitTezOp(TezOperator tezOp) throws VisitorException {
-            if (tezOp.needSegmentBelow() && getPlan().getSuccessors(tezOp) != null) {
-                if (opersToSegment.isEmpty()) {
-                    opersToSegment.add(tezOp);
-                } else {
-                    // If the operator does not have dependency on previous
-                    // operators chosen for segmenting then add it to the
-                    // operators to be segmented together
-                    if (!hasPredecessor(tezOp, opersToSegment)) {
-                        opersToSegment.add(tezOp);
-                    }
-                }
+        public void visitTezOp(TezOperator tezOperator) throws VisitorException {
+            if (tezOperator.needSegmentBelow() && operToSegment == null) {
+                operToSegment = tezOperator;
             }
         }
 
-        /**
-         * Check if the tezOp has one of the opsToCheck as a predecessor.
-         * It can be a immediate predecessor or multiple levels up.
-         */
-        private boolean hasPredecessor(TezOperator tezOp, List<TezOperator> opsToCheck) {
-            List<TezOperator> predecessors = getPlan().getPredecessors(tezOp);
-            if (predecessors != null) {
-                for (TezOperator pred : predecessors) {
-                    if (opersToSegment.contains(pred)) {
-                        return true;
-                    } else {
-                        if (hasPredecessor(pred, opsToCheck)) {
-                            return true;
-                        }
-                    }
-                }
-            }
-            return false;
-        }
-
     }
 
-    private Set<TezOperator> getCommonSplitterPredecessors(TezOperPlan plan, TezOperator operToSegment, TezOperator successor) {
-        Set<TezOperator> splitters1 = new HashSet<>();
-        Set<TezOperator> splitters2 = new HashSet<>();
-        Set<TezOperator> processedPredecessors = new HashSet<>();
-        // Find predecessors which are splitters
-        fetchSplitterPredecessors(plan, operToSegment, processedPredecessors, splitters1);
-        if (!splitters1.isEmpty()) {
-            // For the successor, traverse rest of the plan below it and
-            // search the predecessors of its successors to find any predecessor that might be a splitter.
-            Set<TezOperator> allSuccs = new HashSet<>();
-            getAllSuccessors(plan, successor, allSuccs);
-            processedPredecessors.clear();
-            processedPredecessors.add(successor);
-            for (TezOperator succ : allSuccs) {
-                fetchSplitterPredecessors(plan, succ, processedPredecessors, splitters2);
-            }
-            // Find the common ones
-            splitters1.retainAll(splitters2);
+    private static class TezOperatorFinder extends TezPlanContainerVisitor {
+
+        private TezPlanContainerNode planContainerNode;
+        private TezOperator operatorToFind;
+
+        public TezOperatorFinder(TezPlanContainer plan, TezOperator operatorToFind) {
+            super(plan, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(plan));
+            this.operatorToFind = operatorToFind;
         }
-        return splitters1;
-    }
 
-    private void fetchSplitterPredecessors(TezOperPlan plan, TezOperator tezOp,
-            Set<TezOperator> processedPredecessors, Set<TezOperator> splitters) {
-        List<TezOperator> predecessors = plan.getPredecessors(tezOp);
-        if (predecessors != null) {
-            for (TezOperator pred : predecessors) {
-                // Skip processing already processed predecessor to avoid loops
-                if (processedPredecessors.contains(pred)) {
-                    continue;
-                }
-                if (pred.isSplitter()) {
-                    splitters.add(pred);
-                } else if (!pred.needSegmentBelow()) {
-                    processedPredecessors.add(pred);
-                    fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters);
-                }
-            }
+        public TezPlanContainerNode getPlanContainerNode() {
+            return planContainerNode;
         }
-    }
 
-    private void getAllSuccessors(TezOperPlan plan, TezOperator tezOp, Set<TezOperator> allSuccs) {
-        List<TezOperator> successors = plan.getSuccessors(tezOp);
-        if (successors != null) {
-            for (TezOperator succ : successors) {
-                if (!allSuccs.contains(succ)) {
-                    allSuccs.add(succ);
-                    getAllSuccessors(plan, succ, allSuccs);
-                }
+        @Override
+        public void visitTezPlanContainerNode(
+                TezPlanContainerNode tezPlanContainerNode)
+                throws VisitorException {
+            if (tezPlanContainerNode.getTezOperPlan().getOperatorKey(operatorToFind) != null) {
+                planContainerNode = tezPlanContainerNode;
             }
         }
+
     }
 
     private synchronized OperatorKey generateNodeOperatorKey() {
@@ -350,21 +267,6 @@ public class TezPlanContainer extends Op
         scopeId = 0;
     }
 
-    private POLoad getTmpLoad(String scope, FileSpec fileSpec){
-        POLoad ld = new POLoad(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)));
-        ld.setPc(pigContext);
-        ld.setIsTmpLoad(true);
-        ld.setLFile(fileSpec);
-        return ld;
-    }
-
-    private POStore getTmpStore(String scope, FileSpec fileSpec){
-        POStore st = new POStore(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)));
-        st.setIsTmpStore(true);
-        st.setSFile(fileSpec);
-        return new POStoreTez(st);
-    }
-
     @Override
     public String toString() {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java Fri Feb 24 03:34:37 2017
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
@@ -81,9 +80,6 @@ public class TezPrinter extends TezOpPla
                     printer.setVerbose(isVerbose);
                     printer.visit();
                     mStream.println();
-                } else if (edgeDesc.needsDistinctCombiner()) {
-                    mStream.println("# Combine plan on edge <" + inEdge + ">");
-                    mStream.println(DistinctCombiner.Combine.class.getName());
                 }
             }
         }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java Fri Feb 24 03:34:37 2017
@@ -56,7 +56,6 @@ public class POCounterStatsTez extends P
     private transient KeyValuesReader reader;
     private transient KeyValueWriter writer;
     private transient boolean finished = false;
-    private transient boolean hasNext = false;
 
     public POCounterStatsTez(OperatorKey k) {
         super(k);
@@ -89,7 +88,6 @@ public class POCounterStatsTez extends P
         try {
             reader = (KeyValuesReader) input.getReader();
             LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
-            hasNext = reader.next();
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -132,13 +130,12 @@ public class POCounterStatsTez extends P
             Integer key = null;
             Long value = null;
             // Read count of records per task
-            while (hasNext) {
+            while (reader.next()) {
                 key = ((IntWritable)reader.getCurrentKey()).get();
                 for (Object val : reader.getCurrentValues()) {
                     value = ((LongWritable)val).get();
                     counterRecords.put(key, value);
                 }
-                hasNext = reader.next();
             }
 
             // BinInterSedes only takes String for map key

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java Fri Feb 24 03:34:37 2017
@@ -19,8 +19,6 @@
 package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -103,13 +101,9 @@ public class POFRJoinTez extends POFRJoi
                 LogicalInput input = inputs.get(key);
                 if (!this.replInputs.contains(input)) {
                     this.replInputs.add(input);
-                    KeyValueReader reader = (KeyValueReader) input.getReader();
-                    this.replReaders.add(reader);
-                    log.info("Attached input from vertex " + key + " : input=" + input + ", reader=" + reader);
+                    this.replReaders.add((KeyValueReader) input.getReader());
                 }
             }
-            // Do not force fetch input by reading first record. Cases like MultiQuery_Union_4 have
-            // multiple POFRJoinTez loading same replicate input and will skip records
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -120,7 +114,6 @@ public class POFRJoinTez extends POFRJoi
      *
      * @throws ExecException
      */
-    @SuppressWarnings("unchecked")
     @Override
     protected void setUpHashMap() throws ExecException {
 
@@ -128,8 +121,8 @@ public class POFRJoinTez extends POFRJoi
         // where same POFRJoinTez occurs in different Split sub-plans
         Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
         if (cacheValue != null) {
-            replicates =  (List<Map<? extends Object, ? extends List<Tuple>>>) cacheValue;
-            log.info("Found " + (replicates.size() - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey);
+            replicates = (TupleToMapKey[]) cacheValue;
+            log.info("Found " + (replicates.length - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey);
             return;
         }
 
@@ -155,7 +148,7 @@ public class POFRJoinTez extends POFRJoi
 
         long time1 = System.currentTimeMillis();
 
-        replicates.set(fragment, null);
+        replicates[fragment] = null;
         int inputIdx = 0;
         // We need to adjust the index because the number of replInputs is
         // one less than the number of inputSchemas. The inputSchemas
@@ -165,12 +158,7 @@ public class POFRJoinTez extends POFRJoi
             SchemaTupleFactory inputSchemaTupleFactory = inputSchemaTupleFactories[schemaIdx];
             SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[schemaIdx];
 
-            Map<Object, ArrayList<Tuple>> replicate;
-            if (keySchemaTupleFactory == null) {
-                replicate = new HashMap<Object, ArrayList<Tuple>>(4000);
-            } else {
-                replicate = new TupleToMapKey(4000, keySchemaTupleFactory);
-            }
+            TupleToMapKey replicate = new TupleToMapKey(4000, keySchemaTupleFactory);
             POLocalRearrange lr = LRs[schemaIdx];
 
             try {
@@ -180,8 +168,7 @@ public class POFRJoinTez extends POFRJoi
                     }
 
                     PigNullableWritable key = (PigNullableWritable) replReaders.get(inputIdx).getCurrentKey();
-                    Object keyValue = key.getValueAsPigType();
-                    if (isKeyNull(keyValue)) continue;
+                    if (isKeyNull(key.getValueAsPigType())) continue;
                     NullableTuple val = (NullableTuple) replReaders.get(inputIdx).getCurrentValue();
 
                     // POFRJoin#getValueTuple() is reused to construct valTuple,
@@ -189,31 +176,27 @@ public class POFRJoinTez extends POFRJoi
                     // construct one here.
                     Tuple retTuple = mTupleFactory.newTuple(3);
                     retTuple.set(0, key.getIndex());
-                    retTuple.set(1, keyValue);
+                    retTuple.set(1, key.getValueAsPigType());
                     retTuple.set(2, val.getValueAsPigType());
                     Tuple valTuple = getValueTuple(lr, retTuple);
 
-                    ArrayList<Tuple> values = replicate.get(keyValue);
-                    if (values == null) {
-                        if (inputSchemaTupleFactory == null) {
-                            values = new ArrayList<Tuple>(1);
-                        } else {
-                            values = new TuplesToSchemaTupleList(1, inputSchemaTupleFactory);
-                        }
-                        replicate.put(keyValue, values);
+                    Tuple keyTuple = mTupleFactory.newTuple(1);
+                    keyTuple.set(0, key.getValueAsPigType());
+                    if (replicate.get(keyTuple) == null) {
+                        replicate.put(keyTuple, new TuplesToSchemaTupleList(1, inputSchemaTupleFactory));
                     }
-                    values.add(valTuple);
+                    replicate.get(keyTuple).add(valTuple);
                 }
             } catch (IOException e) {
                 throw new ExecException(e);
             }
-            replicates.set(schemaIdx, replicate);
+            replicates[schemaIdx] = replicate;
             inputIdx++;
             schemaIdx++;
         }
 
         long time2 = System.currentTimeMillis();
-        log.info((replicates.size() - 1) + " replication hash tables built. Time taken: " + (time2 - time1));
+        log.info((replicates.length - 1) + " replication hash tables built. Time taken: " + (time2 - time1));
 
         ObjectCache.getInstance().cache(cacheKey, replicates);
         log.info("Cached replicate hash tables in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java Fri Feb 24 03:34:37 2017
@@ -57,7 +57,6 @@ public class POIdentityInOutTez extends
     private transient KeyValuesReader shuffleReader;
     private transient boolean shuffleInput;
     private transient boolean finished = false;
-    private transient boolean hasNext = false;
 
     public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange, String inputKey) {
         super(inputRearrange);
@@ -96,12 +95,9 @@ public class POIdentityInOutTez extends
             Reader r = input.getReader();
             if (r instanceof KeyValueReader) {
                 reader = (KeyValueReader) r;
-                // Force input fetch
-                hasNext = reader.next();
             } else {
                 shuffleInput = true;
                 shuffleReader = (KeyValuesReader) r;
-                hasNext = shuffleReader.next();
             }
             LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + r);
         } catch (Exception e) {
@@ -131,7 +127,7 @@ public class POIdentityInOutTez extends
                 return RESULT_EOP;
             }
             if (shuffleInput) {
-                while (hasNext) {
+                while (shuffleReader.next()) {
                     Object curKey = shuffleReader.getCurrentKey();
                     Iterable<Object> vals = shuffleReader.getCurrentValues();
                     if (isSkewedJoin) {
@@ -143,10 +139,9 @@ public class POIdentityInOutTez extends
                     for (Object val : vals) {
                         writer.write(curKey, val);
                     }
-                    hasNext = shuffleReader.next();
                 }
             } else {
-                while (hasNext) {
+                while (reader.next()) {
                     if (isSkewedJoin) {
                         NullablePartitionWritable wrappedKey = new NullablePartitionWritable(
                                 (PigNullableWritable) reader.getCurrentKey());
@@ -160,7 +155,6 @@ public class POIdentityInOutTez extends
                         writer.write(reader.getCurrentKey(),
                                 reader.getCurrentValue());
                     }
-                    hasNext = reader.next();
                 }
             }
             finished = true;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java Fri Feb 24 03:34:37 2017
@@ -71,8 +71,8 @@ public class POLocalRearrangeTez extends
         }
     }
 
-    public boolean containsOutputKey(String key) {
-        return outputKey.equals(key);
+    public String getOutputKey() {
+        return outputKey;
     }
 
     public void setOutputKey(String outputKey) {
@@ -122,10 +122,6 @@ public class POLocalRearrangeTez extends
         }
     }
 
-    protected Result getRearrangedTuple() throws ExecException {
-        return super.getNextTuple();
-    }
-
     @Override
     public Result getNextTuple() throws ExecException {
         res = super.getNextTuple();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java Fri Feb 24 03:34:37 2017
@@ -51,7 +51,6 @@ public class PORankTez extends PORank im
     private transient Map<Integer, Long> counterOffsets;
     private transient Configuration conf;
     private transient boolean finished = false;
-    private transient Boolean hasFirstRecord;
 
     public PORankTez(PORank copy) {
         super(copy);
@@ -101,7 +100,6 @@ public class PORankTez extends PORank im
         try {
             reader = (KeyValueReader) input.getReader();
             LOG.info("Attached input from vertex " + tuplesInputKey + " : input=" + input + ", reader=" + reader);
-            hasFirstRecord = reader.next();
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -142,18 +140,9 @@ public class PORankTez extends PORank im
         Result inp = null;
 
         try {
-            if (hasFirstRecord != null) {
-                if (hasFirstRecord) {
-                    hasFirstRecord = null;
-                    inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
-                    return addRank(inp);
-                }
-                hasFirstRecord = null;
-            } else {
-                while (reader.next()) {
-                    inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
-                    return addRank(inp);
-                }
+            while (reader.next()) {
+                inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
+                return addRank(inp);
             }
         } catch (IOException e) {
             throw new ExecException(e);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java Fri Feb 24 03:34:37 2017
@@ -25,8 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -34,16 +32,12 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
 import org.apache.pig.data.AccumulativeBag;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalCachedBag;
-import org.apache.pig.data.ReadOnceBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -54,7 +48,6 @@ import org.apache.tez.runtime.library.co
 public class POShuffleTezLoad extends POPackage implements TezInput {
 
     private static final long serialVersionUID = 1L;
-    private static final Log LOG = LogFactory.getLog(POShuffleTezLoad.class);
 
     protected List<String> inputKeys = new ArrayList<String>();
     private boolean isSkewedJoin = false;
@@ -68,7 +61,6 @@ public class POShuffleTezLoad extends PO
     private transient WritableComparator groupingComparator = null;
     private transient Configuration conf;
     private transient int accumulativeBatchSize;
-    private transient boolean readOnceOneBag;
 
     public POShuffleTezLoad(POPackage pack) {
         super(pack);
@@ -109,10 +101,7 @@ public class POShuffleTezLoad extends PO
                 //     - Input key will be repeated, but index would be same within a TezInput
                 if (!this.inputs.contains(input)) {
                     this.inputs.add(input);
-                    KeyValuesReader reader = (KeyValuesReader)input.getReader();
-                    this.readers.add(reader);
-                    LOG.info("Attached input from vertex " + inputKey
-                            + " : input=" + input + ", reader=" + reader);
+                    this.readers.add((KeyValuesReader)input.getReader());
                 }
             }
 
@@ -128,13 +117,6 @@ public class POShuffleTezLoad extends PO
             for (int i = 0; i < numTezInputs; i++) {
                 finished[i] = !readers.get(i).next();
             }
-
-            this.readOnceOneBag = (numInputs == 1)
-                    && (pkgr instanceof CombinerPackager
-                            || pkgr instanceof LitePackager || pkgr instanceof BloomPackager);
-            if (readOnceOneBag) {
-                readOnce[0] = true;
-            }
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -205,47 +187,43 @@ public class POShuffleTezLoad extends PO
 
                 } else {
 
-                    if (readOnceOneBag) {
-                        bags[0] = new TezReadOnceBag(pkgr, min);
-                    } else {
-                        for (int i = 0; i < numInputs; i++) {
-                            bags[i] = new InternalCachedBag(numInputs);
-                        }
+                    for (int i = 0; i < numInputs; i++) {
+                        bags[i] = new InternalCachedBag(numInputs);
+                    }
 
-                        if (numTezInputs == 1) {
-                            do {
-                                Iterable<Object> vals = readers.get(0).getCurrentValues();
-                                for (Object val : vals) {
-                                    NullableTuple nTup = (NullableTuple) val;
-                                    int index = nTup.getIndex();
-                                    Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
-                                    bags[index].add(tup);
-                                }
-                                finished[0] = !readers.get(0).next();
-                                if (finished[0]) {
-                                    break;
-                                }
-                                cur = readers.get(0).getCurrentKey();
-                            } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators
-                        } else {
-                            for (int i = 0; i < numTezInputs; i++) {
-                                if (!finished[i]) {
-                                    cur = readers.get(i).getCurrentKey();
-                                    // We need to loop in case of Grouping Comparators
-                                    while (groupingComparator.compare(min, cur) == 0) {
-                                        Iterable<Object> vals = readers.get(i).getCurrentValues();
-                                        for (Object val : vals) {
-                                            NullableTuple nTup = (NullableTuple) val;
-                                            int index = nTup.getIndex();
-                                            Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
-                                            bags[index].add(tup);
-                                        }
-                                        finished[i] = !readers.get(i).next();
-                                        if (finished[i]) {
-                                            break;
-                                        }
-                                        cur = readers.get(i).getCurrentKey();
+                    if (numTezInputs == 1) {
+                        do {
+                            Iterable<Object> vals = readers.get(0).getCurrentValues();
+                            for (Object val : vals) {
+                                NullableTuple nTup = (NullableTuple) val;
+                                int index = nTup.getIndex();
+                                Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+                                bags[index].add(tup);
+                            }
+                            finished[0] = !readers.get(0).next();
+                            if (finished[0]) {
+                                break;
+                            }
+                            cur = readers.get(0).getCurrentKey();
+                        } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators
+                    } else {
+                        for (int i = 0; i < numTezInputs; i++) {
+                            if (!finished[i]) {
+                                cur = readers.get(i).getCurrentKey();
+                                // We need to loop in case of Grouping Comparators
+                                while (groupingComparator.compare(min, cur) == 0) {
+                                    Iterable<Object> vals = readers.get(i).getCurrentValues();
+                                    for (Object val : vals) {
+                                        NullableTuple nTup = (NullableTuple) val;
+                                        int index = nTup.getIndex();
+                                        Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+                                        bags[index].add(tup);
+                                    }
+                                    finished[i] = !readers.get(i).next();
+                                    if (finished[i]) {
+                                        break;
                                     }
+                                    cur = readers.get(i).getCurrentKey();
                                 }
                             }
                         }
@@ -405,74 +383,4 @@ public class POShuffleTezLoad extends PO
 
     }
 
-    private class TezReadOnceBag extends ReadOnceBag {
-
-        private static final long serialVersionUID = 1L;
-        private Iterator<Object> iter;
-
-        public TezReadOnceBag(Packager pkgr,
-                PigNullableWritable currentKey) throws IOException {
-            this.pkgr = pkgr;
-            this.keyWritable = currentKey;
-            this.iter = readers.get(0).getCurrentValues().iterator();
-        }
-
-        @Override
-        public Iterator<Tuple> iterator() {
-            return new TezReadOnceBagIterator();
-        }
-
-        private class TezReadOnceBagIterator implements Iterator<Tuple> {
-
-            @Override
-            public boolean hasNext() {
-                if (iter.hasNext()) {
-                    return true;
-                } else {
-                    try {
-                        finished[0] = !readers.get(0).next();
-                        if (finished[0]) {
-                            return false;
-                        }
-                        // Currently combiner is not being applied when secondary key(grouping comparator) is used
-                        // But might change in future. So check if the next key is same and return its values
-                        Object cur = readers.get(0).getCurrentKey();
-                        if (groupingComparator.compare(keyWritable, cur) == 0) {
-                            iter = readers.get(0).getCurrentValues().iterator();
-                            // Key should at least have one value. But doing a check just for safety
-                            if (iter.hasNext()) {
-                                return true;
-                            } else {
-                                throw new RuntimeException("Unexpected. Key " + keyWritable + " does not have any values");
-                            }
-                        }
-                        return false;
-                    } catch (IOException e) {
-                        throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e);
-                    }
-                }
-            }
-
-            @Override
-            public Tuple next() {
-                NullableTuple ntup = (NullableTuple) iter.next();
-                int index = ntup.getIndex();
-                Tuple ret = null;
-                try {
-                    ret = pkgr.getValueTuple(keyWritable, ntup, index);
-                } catch (ExecException e) {
-                    throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e);
-                }
-                return ret;
-            }
-
-            @Override
-            public void remove() {
-                throw new RuntimeException("ReadOnceBag.iterator().remove() is not allowed");
-            }
-        }
-
-    }
-
-
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java Fri Feb 24 03:34:37 2017
@@ -57,7 +57,6 @@ public class POShuffledValueInputTez ext
     private transient Iterator<KeyValueReader> readers;
     private transient KeyValueReader currentReader;
     private transient Configuration conf;
-    private transient Boolean hasFirstRecord;
 
     public POShuffledValueInputTez(OperatorKey k) {
         super(k);
@@ -99,8 +98,6 @@ public class POShuffledValueInputTez ext
             }
             readers = readersList.iterator();
             currentReader = readers.next();
-            // Force input fetch
-            hasFirstRecord = currentReader.next();
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -114,15 +111,7 @@ public class POShuffledValueInputTez ext
             }
 
             do {
-                if (hasFirstRecord != null) {
-                    if (hasFirstRecord) {
-                        hasFirstRecord = null;
-                        Tuple origTuple = (Tuple) currentReader.getCurrentValue();
-                        Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
-                        return new Result(POStatus.STATUS_OK, copy);
-                    }
-                    hasFirstRecord = null;
-                } else if (currentReader.next()) {
+                if (currentReader.next()) {
                     Tuple origTuple = (Tuple) currentReader.getCurrentValue();
                     Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
                     return new Result(POStatus.STATUS_OK, copy);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java Fri Feb 24 03:34:37 2017
@@ -60,8 +60,6 @@ public class POSimpleTezLoad extends POL
     private transient Configuration conf;
     private transient boolean finished = false;
     private transient TezCounter inputRecordCounter;
-    private transient boolean initialized;
-    private transient boolean noTupleCopy;
 
     public POSimpleTezLoad(OperatorKey k, LoadFunc loader) {
         super(k, loader);
@@ -151,13 +149,7 @@ public class POSimpleTezLoad extends POL
             } else {
                 Result res = new Result();
                 Tuple next = (Tuple) reader.getCurrentValue();
-                if (!initialized) {
-                    noTupleCopy = mTupleFactory.newTuple(1).getClass().isInstance(next);
-                    initialized = true;
-                }
-                // Some Loaders return implementations of DefaultTuple instead of BinSedesTuple
-                // In that case copy to BinSedesTuple
-                res.result = noTupleCopy ? next : mTupleFactory.newTupleNoCopy(next.getAll());
+                res.result = next;
                 res.returnStatus = POStatus.STATUS_OK;
                 if (inputRecordCounter != null) {
                     inputRecordCounter.increment(1);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java Fri Feb 24 03:34:37 2017
@@ -102,19 +102,19 @@ public class POStoreTez extends POStore
             throw new ExecException(e);
         }
 
-        // Even if there is a single hdfs output, we add multi store counter
-        // Makes it easier for user to see records for a particular store from
-        // the DAG counter
-        CounterGroup multiStoreGroup = processorContext.getCounters()
-                .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
-        if (multiStoreGroup == null) {
-            processorContext.getCounters().addGroup(
-                    MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
-                    MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
-        }
-        String name = MRPigStatsUtil.getMultiStoreCounterName(this);
-        if (name != null) {
-            outputRecordCounter = multiStoreGroup.addCounter(name, name, 0);
+        // Multiple outputs - can be another store or other outputs (shuffle, broadcast)
+        if (outputs.size() > 1) {
+            CounterGroup multiStoreGroup = processorContext.getCounters()
+                    .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+            if (multiStoreGroup == null) {
+                processorContext.getCounters().addGroup(
+                        MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
+                        MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+            }
+            String name = MRPigStatsUtil.getMultiStoreCounterName(this);
+            if (name != null) {
+                outputRecordCounter = multiStoreGroup.addCounter(name, name, 0);
+            }
         }
     }
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java Fri Feb 24 03:34:37 2017
@@ -57,7 +57,6 @@ public class POValueInputTez extends Phy
     private transient KeyValuesReader shuffleReader;
     private transient boolean shuffleInput;
     private transient boolean hasNext;
-    private transient Boolean hasFirstRecord;
 
     public POValueInputTez(OperatorKey k) {
         super(k);
@@ -93,8 +92,6 @@ public class POValueInputTez extends Phy
             Reader r = input.getReader();
             if (r instanceof KeyValueReader) {
                 reader = (KeyValueReader) r;
-                // Force input fetch
-                hasFirstRecord = reader.next();
             } else {
                 shuffleInput = true;
                 shuffleReader = (KeyValuesReader) r;
@@ -121,22 +118,10 @@ public class POValueInputTez extends Phy
                     }
                     hasNext = shuffleReader.next();
                 }
-            } else {
-                if (hasFirstRecord != null) {
-                    if (hasFirstRecord) {
-                        hasFirstRecord = null;
-                        Tuple origTuple = (Tuple) reader.getCurrentValue();
-                        Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
-                        return new Result(POStatus.STATUS_OK, copy);
-                    }
-                    hasFirstRecord = null;
-                } else {
-                    while (reader.next()) {
-                        Tuple origTuple = (Tuple) reader.getCurrentValue();
-                        Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
-                        return new Result(POStatus.STATUS_OK, copy);
-                    }
-                }
+            } else if (reader.next()) {
+                Tuple origTuple = (Tuple) reader.getCurrentValue();
+                Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+                return new Result(POStatus.STATUS_OK, copy);
             }
             finished = true;
             // For certain operators (such as STREAM), we could still have some work

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java Fri Feb 24 03:34:37 2017
@@ -69,11 +69,6 @@ public class CombinerOptimizer extends T
         }
 
         for (TezOperator from : predecessors) {
-            PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
-            if (!combinePlan.isEmpty()) {
-                // Cases like bloom join have combine plan already set
-                continue;
-            }
             List<POLocalRearrangeTez> rearranges = PlanHelper.getPhysicalOperators(from.plan, POLocalRearrangeTez.class);
             if (rearranges.isEmpty()) {
                 continue;
@@ -82,7 +77,7 @@ public class CombinerOptimizer extends T
             POLocalRearrangeTez connectingLR = null;
             PhysicalPlan rearrangePlan = from.plan;
             for (POLocalRearrangeTez lr : rearranges) {
-                if (lr.containsOutputKey(to.getOperatorKey().toString())) {
+                if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
                     connectingLR = lr;
                     break;
                 }
@@ -95,6 +90,7 @@ public class CombinerOptimizer extends T
 
             // Detected the POLocalRearrange -> POPackage pattern. Let's add
             // combiner if possible.
+            PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
             CombinerOptimizerUtil.addCombiner(rearrangePlan, to.plan, combinePlan, messageCollector, doMapAgg);
 
             if(!combinePlan.isEmpty()) {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java Fri Feb 24 03:34:37 2017
@@ -30,7 +30,6 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
@@ -66,6 +65,11 @@ public class LoaderProcessor extends Tez
         this.jobConf.setBoolean("mapred.mapper.new-api", true);
         this.jobConf.setClass("mapreduce.inputformat.class",
                 PigInputFormat.class, InputFormat.class);
+        try {
+            this.jobConf.set("pig.pigContext", ObjectSerializer.serialize(pc));
+        } catch (IOException e) {
+            throw new VisitorException(e);
+        }
     }
 
     /**
@@ -171,7 +175,6 @@ public class LoaderProcessor extends Tez
             // splits can be moved to if(loads) block below
             int parallelism = tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks();
             tezOp.setRequestedParallelism(parallelism);
-            tezOp.setTotalInputFilesSize(InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job));
         }
         return lds;
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java Fri Feb 24 03:34:37 2017
@@ -153,8 +153,6 @@ public class MultiQueryOptimizerTez exte
                     }
                 }
                 if (getPlan().getSuccessors(successor) != null) {
-                    nonPackageInputSuccessors.clear();
-                    toMergeSuccessors.clear();
                     for (TezOperator succSuccessor : getPlan().getSuccessors(successor)) {
                         if (succSuccessor.isUnion()) {
                             if (!(unionOptimizerOn &&
@@ -173,13 +171,7 @@ public class MultiQueryOptimizerTez exte
                                         continue;
                                     }
                                 }
-                                if (TezCompilerUtil.isNonPackageInput(successor.getOperatorKey().toString(), succSuccessor)) {
-                                    // Output goes to scalar or POFRJoinTez in the union operator
-                                    // We need to ensure it is the only one to avoid parallel edges
-                                    canMerge = canMerge ? nonPackageInputSuccessors.add(succSuccessor) : false;
-                                } else {
-                                    toMergeSuccessors.add(succSuccessor);
-                                }
+                                toMergeSuccessors.add(succSuccessor);
                                 List<TezOperator> unionSuccessors = getPlan().getSuccessors(succSuccessor);
                                 if (unionSuccessors != null) {
                                     for (TezOperator unionSuccessor : unionSuccessors) {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Fri Feb 24 03:34:37 2017
@@ -115,16 +115,11 @@ public class ParallelismSetter extends T
                     } else if (pc.defaultParallel != -1) {
                         parallelism = pc.defaultParallel;
                     }
-                    if (parallelism == 0) {
-                        // We need to produce empty output file.
-                        // Even if user set PARALLEL 0, mapreduce has 1 reducer
-                        parallelism = 1;
-                    }
                     boolean overrideRequestedParallelism = false;
                     if (parallelism != -1
                             && autoParallelismEnabled
-                            && !tezOp.isDontEstimateParallelism()
                             && tezOp.isIntermediateReducer()
+                            && !tezOp.isDontEstimateParallelism()
                             && tezOp.isOverrideIntermediateParallelism()) {
                         overrideRequestedParallelism = true;
                     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java Fri Feb 24 03:34:37 2017
@@ -75,7 +75,7 @@ public class SecondaryKeyOptimizerTez ex
         POLocalRearrangeTez connectingLR = null;
         PhysicalPlan rearrangePlan = from.plan;
         for (POLocalRearrangeTez lr : rearranges) {
-            if (lr.containsOutputKey(to.getOperatorKey().toString())) {
+            if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
                 connectingLR = lr;
                 break;
             }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java Fri Feb 24 03:34:37 2017
@@ -30,8 +30,6 @@ public class TezEstimatedParallelismClea
 
     @Override
     public void visitTezOp(TezOperator tezOp) throws VisitorException {
-        if (!tezOp.isDontEstimateParallelism()) {
-            tezOp.setEstimatedParallelism(-1);
-        }
+        tezOp.setEstimatedParallelism(-1);
     }
 }