You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 08:19:46 UTC

svn commit: r1784237 [9/22] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java Fri Feb 24 08:19:42 2017
@@ -29,9 +29,14 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.PlanException;
@@ -160,100 +165,178 @@ public class TezPlanContainer extends Op
             return;
         }
 
-        TezOperator operToSegment = null;
-        List<TezOperator> succs = new ArrayList<TezOperator>();
+        List<TezOperator> opersToSegment = null;
         try {
             // Split top down from root to leaves
-            SegmentOperatorFinder finder = new SegmentOperatorFinder(tezOperPlan);
+            // Get list of operators closer to the root that can be segmented together
+            FirstLevelSegmentOperatorsFinder finder = new FirstLevelSegmentOperatorsFinder(tezOperPlan);
             finder.visit();
-            operToSegment = finder.getOperatorToSegment();
+            opersToSegment = finder.getOperatorsToSegment();
         } catch (VisitorException e) {
             throw new PlanException(e);
         }
+        if (!opersToSegment.isEmpty()) {
+            Set<TezOperator> commonSplitterPredecessors = new HashSet<>();
+            for (TezOperator operToSegment : opersToSegment) {
+                for (TezOperator succ : tezOperPlan.getSuccessors(operToSegment)) {
+                    commonSplitterPredecessors
+                            .addAll(getCommonSplitterPredecessors(tezOperPlan,
+                                    operToSegment, succ));
+                }
+            }
 
-        if (operToSegment != null && tezOperPlan.getSuccessors(operToSegment) != null) {
-            succs.addAll(tezOperPlan.getSuccessors(operToSegment));
-            for (TezOperator succ : succs) {
-                tezOperPlan.disconnect(operToSegment, succ);
-            }
-            for (TezOperator succ : succs) {
-                try {
-                    if (tezOperPlan.getOperator(succ.getOperatorKey()) == null) {
-                        // Has already been moved to a new plan by previous successor
-                        // as part of dependency. It could have been further split.
-                        // So walk the full plan to find the new plan and connect
-                        TezOperatorFinder finder = new TezOperatorFinder(this, succ);
-                        finder.visit();
-                        connect(planNode, finder.getPlanContainerNode());
-                        continue;
+            if (commonSplitterPredecessors.isEmpty()) {
+                List<TezOperator> allSuccs = new ArrayList<TezOperator>();
+                // Disconnect all the successors and move them to a new plan
+                for (TezOperator operToSegment : opersToSegment) {
+                    List<TezOperator> succs = new ArrayList<TezOperator>();
+                    succs.addAll(tezOperPlan.getSuccessors(operToSegment));
+                    allSuccs.addAll(succs);
+                    for (TezOperator succ : succs) {
+                        tezOperPlan.disconnect(operToSegment, succ);
                     }
-                    TezOperPlan newOperPlan = new TezOperPlan();
+                }
+                TezOperPlan newOperPlan = new TezOperPlan();
+                for (TezOperator succ : allSuccs) {
                     tezOperPlan.moveTree(succ, newOperPlan);
-                    TezPlanContainerNode newPlanNode = new TezPlanContainerNode(
-                            generateNodeOperatorKey(), newOperPlan);
-                    add(newPlanNode);
-                    connect(planNode, newPlanNode);
-                    split(newPlanNode);
-                    if (newPlanNode.getTezOperPlan().getOperator(succ.getOperatorKey()) == null) {
-                        // On further split, the successor moved to a new plan container.
-                        // Connect to that
-                        TezOperatorFinder finder = new TezOperatorFinder(this, succ);
-                        finder.visit();
-                        disconnect(planNode, newPlanNode);
-                        connect(planNode, finder.getPlanContainerNode());
+                }
+                TezPlanContainerNode newPlanNode = new TezPlanContainerNode(
+                        generateNodeOperatorKey(), newOperPlan);
+                add(newPlanNode);
+                connect(planNode, newPlanNode);
+                split(newPlanNode);
+            } else {
+                // If there is a common splitter predecessor between operToSegment and the successor,
+                // we have to separate out that split to be able to segment.
+                // So we store the output of split to a temp store and then change the
+                // splittees to load from it.
+                String scope = opersToSegment.get(0).getOperatorKey().getScope();
+                for (TezOperator splitter : commonSplitterPredecessors) {
+                    try {
+                        List<TezOperator> succs = new ArrayList<TezOperator>();
+                        succs.addAll(tezOperPlan.getSuccessors(splitter));
+                        FileSpec fileSpec = TezCompiler.getTempFileSpec(pigContext);
+                        POStore tmpStore = getTmpStore(scope, fileSpec);
+                        // Replace POValueOutputTez with POStore
+                        splitter.plan.remove(splitter.plan.getLeaves().get(0));
+                        splitter.plan.addAsLeaf(tmpStore);
+                        splitter.segmentBelow = true;
+                        splitter.setSplitter(false);
+                        for (TezOperator succ : succs) {
+                            // Replace POValueInputTez with POLoad
+                            POLoad tmpLoad = getTmpLoad(scope, fileSpec);
+                            succ.plan.replace(succ.plan.getRoots().get(0), tmpLoad);
+                        }
+                    } catch (Exception e) {
+                        throw new PlanException(e);
                     }
-                } catch (VisitorException e) {
-                    throw new PlanException(e);
                 }
             }
             split(planNode);
         }
     }
 
-    private static class SegmentOperatorFinder extends TezOpPlanVisitor {
+    private static class FirstLevelSegmentOperatorsFinder extends TezOpPlanVisitor {
 
-        private TezOperator operToSegment;
+        private List<TezOperator> opersToSegment = new ArrayList<>();
 
-        public SegmentOperatorFinder(TezOperPlan plan) {
+        public FirstLevelSegmentOperatorsFinder(TezOperPlan plan) {
             super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
         }
 
-        public TezOperator getOperatorToSegment() {
-            return operToSegment;
+        public List<TezOperator> getOperatorsToSegment() {
+            return opersToSegment;
         }
 
         @Override
-        public void visitTezOp(TezOperator tezOperator) throws VisitorException {
-            if (tezOperator.needSegmentBelow() && operToSegment == null) {
-                operToSegment = tezOperator;
+        public void visitTezOp(TezOperator tezOp) throws VisitorException {
+            if (tezOp.needSegmentBelow() && getPlan().getSuccessors(tezOp) != null) {
+                if (opersToSegment.isEmpty()) {
+                    opersToSegment.add(tezOp);
+                } else {
+                    // If the operator does not have dependency on previous
+                    // operators chosen for segmenting then add it to the
+                    // operators to be segmented together
+                    if (!hasPredecessor(tezOp, opersToSegment)) {
+                        opersToSegment.add(tezOp);
+                    }
+                }
             }
         }
 
-    }
-
-    private static class TezOperatorFinder extends TezPlanContainerVisitor {
+        /**
+         * Check if the tezOp has one of the opsToCheck as a predecessor.
+         * It can be a immediate predecessor or multiple levels up.
+         */
+        private boolean hasPredecessor(TezOperator tezOp, List<TezOperator> opsToCheck) {
+            List<TezOperator> predecessors = getPlan().getPredecessors(tezOp);
+            if (predecessors != null) {
+                for (TezOperator pred : predecessors) {
+                    if (opersToSegment.contains(pred)) {
+                        return true;
+                    } else {
+                        if (hasPredecessor(pred, opsToCheck)) {
+                            return true;
+                        }
+                    }
+                }
+            }
+            return false;
+        }
 
-        private TezPlanContainerNode planContainerNode;
-        private TezOperator operatorToFind;
+    }
 
-        public TezOperatorFinder(TezPlanContainer plan, TezOperator operatorToFind) {
-            super(plan, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(plan));
-            this.operatorToFind = operatorToFind;
+    private Set<TezOperator> getCommonSplitterPredecessors(TezOperPlan plan, TezOperator operToSegment, TezOperator successor) {
+        Set<TezOperator> splitters1 = new HashSet<>();
+        Set<TezOperator> splitters2 = new HashSet<>();
+        Set<TezOperator> processedPredecessors = new HashSet<>();
+        // Find predecessors which are splitters
+        fetchSplitterPredecessors(plan, operToSegment, processedPredecessors, splitters1);
+        if (!splitters1.isEmpty()) {
+            // For the successor, traverse rest of the plan below it and
+            // search the predecessors of its successors to find any predecessor that might be a splitter.
+            Set<TezOperator> allSuccs = new HashSet<>();
+            getAllSuccessors(plan, successor, allSuccs);
+            processedPredecessors.clear();
+            processedPredecessors.add(successor);
+            for (TezOperator succ : allSuccs) {
+                fetchSplitterPredecessors(plan, succ, processedPredecessors, splitters2);
+            }
+            // Find the common ones
+            splitters1.retainAll(splitters2);
         }
+        return splitters1;
+    }
 
-        public TezPlanContainerNode getPlanContainerNode() {
-            return planContainerNode;
+    private void fetchSplitterPredecessors(TezOperPlan plan, TezOperator tezOp,
+            Set<TezOperator> processedPredecessors, Set<TezOperator> splitters) {
+        List<TezOperator> predecessors = plan.getPredecessors(tezOp);
+        if (predecessors != null) {
+            for (TezOperator pred : predecessors) {
+                // Skip processing already processed predecessor to avoid loops
+                if (processedPredecessors.contains(pred)) {
+                    continue;
+                }
+                if (pred.isSplitter()) {
+                    splitters.add(pred);
+                } else if (!pred.needSegmentBelow()) {
+                    processedPredecessors.add(pred);
+                    fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters);
+                }
+            }
         }
+    }
 
-        @Override
-        public void visitTezPlanContainerNode(
-                TezPlanContainerNode tezPlanContainerNode)
-                throws VisitorException {
-            if (tezPlanContainerNode.getTezOperPlan().getOperatorKey(operatorToFind) != null) {
-                planContainerNode = tezPlanContainerNode;
+    private void getAllSuccessors(TezOperPlan plan, TezOperator tezOp, Set<TezOperator> allSuccs) {
+        List<TezOperator> successors = plan.getSuccessors(tezOp);
+        if (successors != null) {
+            for (TezOperator succ : successors) {
+                if (!allSuccs.contains(succ)) {
+                    allSuccs.add(succ);
+                    getAllSuccessors(plan, succ, allSuccs);
+                }
             }
         }
-
     }
 
     private synchronized OperatorKey generateNodeOperatorKey() {
@@ -267,6 +350,21 @@ public class TezPlanContainer extends Op
         scopeId = 0;
     }
 
+    private POLoad getTmpLoad(String scope, FileSpec fileSpec){
+        POLoad ld = new POLoad(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+        ld.setPc(pigContext);
+        ld.setIsTmpLoad(true);
+        ld.setLFile(fileSpec);
+        return ld;
+    }
+
+    private POStore getTmpStore(String scope, FileSpec fileSpec){
+        POStore st = new POStore(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+        st.setIsTmpStore(true);
+        st.setSFile(fileSpec);
+        return new POStoreTez(st);
+    }
+
     @Override
     public String toString() {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java Fri Feb 24 08:19:42 2017
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
@@ -80,6 +81,9 @@ public class TezPrinter extends TezOpPla
                     printer.setVerbose(isVerbose);
                     printer.visit();
                     mStream.println();
+                } else if (edgeDesc.needsDistinctCombiner()) {
+                    mStream.println("# Combine plan on edge <" + inEdge + ">");
+                    mStream.println(DistinctCombiner.Combine.class.getName());
                 }
             }
         }

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.builtin.BuildBloomBase;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+public class BloomPackager extends Packager {
+
+    private static final long serialVersionUID = 1L;
+
+    private boolean bloomCreatedInMap;
+    private int vectorSizeBytes;
+    private int numHash;
+    private int hashType;
+    private byte bloomKeyType;
+    private boolean isCombiner;
+
+    private transient ByteArrayOutputStream baos;
+    private transient Iterator<Object> distinctKeyIter;
+
+    public BloomPackager(boolean bloomCreatedInMap, int vectorSizeBytes,
+            int numHash, int hashType) {
+        super();
+        this.bloomCreatedInMap = bloomCreatedInMap;
+        this.vectorSizeBytes = vectorSizeBytes;
+        this.numHash = numHash;
+        this.hashType = hashType;
+    }
+
+    public void setBloomKeyType(byte keyType) {
+        bloomKeyType = keyType;
+    }
+
+    public void setCombiner(boolean isCombiner) {
+        this.isCombiner = isCombiner;
+    }
+
+    @Override
+    public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+            throws ExecException {
+        this.key = key;
+        this.bags = bags;
+        this.readOnce = readOnce;
+        // Bag can be read directly and need not be materialized again
+    }
+
+    @Override
+    public Result getNext() throws ExecException {
+        try {
+            if (bloomCreatedInMap) {
+                if (bags == null) {
+                    return new Result(POStatus.STATUS_EOP, null);
+                }
+                // Same function for combiner and reducer
+                return combineBloomFilters();
+            } else {
+                if (isCombiner) {
+                    return getDistinctBloomKeys();
+                } else {
+                    if (bags == null) {
+                        return new Result(POStatus.STATUS_EOP, null);
+                    }
+                    return createBloomFilter();
+                }
+            }
+        } catch (IOException e) {
+            throw new ExecException("Error while constructing final bloom filter", e);
+        }
+    }
+
+    private Result combineBloomFilters() throws IOException {
+        // We get a bag of bloom filters. combine them into one
+        Iterator<Tuple> iter = bags[0].iterator();
+        Tuple tup = iter.next();
+        DataByteArray bloomBytes = (DataByteArray) tup.get(0);
+        BloomFilter bloomFilter = BuildBloomBase.bloomIn(bloomBytes);
+        while (iter.hasNext()) {
+            tup = iter.next();
+            bloomFilter.or(BuildBloomBase.bloomIn((DataByteArray) tup.get(0)));
+        }
+
+        Object partition = key;
+        detachInput(); // Free up the key and bags reference
+
+        return getSerializedBloomFilter(partition, bloomFilter, bloomBytes.get().length);
+    }
+
+    private Result createBloomFilter() throws IOException {
+        // We get a bag of keys. Create a bloom filter from them
+        // First do distinct of the keys. Not using DistinctBag as memory should not be a problem.
+        HashSet<Object> bloomKeys = new HashSet<>();
+        Iterator<Tuple> iter = bags[0].iterator();
+        while (iter.hasNext()) {
+            bloomKeys.add(iter.next().get(0));
+        }
+
+        Object partition = key;
+        detachInput(); // Free up the key and bags reference
+
+        BloomFilter bloomFilter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+        for (Object bloomKey: bloomKeys) {
+            Key k = new Key(DataType.toBytes(bloomKey, bloomKeyType));
+            bloomFilter.add(k);
+        }
+        bloomKeys = null;
+        return getSerializedBloomFilter(partition, bloomFilter, vectorSizeBytes + 64);
+
+    }
+
+    private Result getSerializedBloomFilter(Object partition,
+            BloomFilter bloomFilter, int serializedSize) throws ExecException,
+            IOException {
+        if (baos == null) {
+            baos = new ByteArrayOutputStream(serializedSize);
+        }
+        baos.reset();
+        DataOutputStream dos = new DataOutputStream(baos);
+        bloomFilter.write(dos);
+        dos.flush();
+
+        Tuple res = mTupleFactory.newTuple(2);
+        res.set(0, partition);
+        res.set(1, new DataByteArray(baos.toByteArray()));
+
+        Result r = new Result();
+        r.result = res;
+        r.returnStatus = POStatus.STATUS_OK;
+        return r;
+    }
+
+    private Result getDistinctBloomKeys() throws ExecException {
+        if (distinctKeyIter == null) {
+            HashSet<Object> bloomKeys = new HashSet<>();
+            Iterator<Tuple> iter = bags[0].iterator();
+            while (iter.hasNext()) {
+                bloomKeys.add(iter.next().get(0));
+            }
+            distinctKeyIter = bloomKeys.iterator();
+        }
+        while (distinctKeyIter.hasNext()) {
+            Tuple res = mTupleFactory.newTuple(2);
+            res.set(0, key);
+            res.set(1, distinctKeyIter.next());
+
+            Result r = new Result();
+            r.result = res;
+            r.returnStatus = POStatus.STATUS_OK;
+            return r;
+        }
+        distinctKeyIter = null;
+        return new Result(POStatus.STATUS_EOP, null);
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import org.apache.pig.builtin.BuildBloomBase;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class POBloomFilterRearrangeTez extends POLocalRearrangeTez implements TezInput {
+    private static final long serialVersionUID = 1L;
+
+    private static final Log LOG = LogFactory.getLog(POBloomFilterRearrangeTez.class);
+    private String inputKey;
+    private transient KeyValueReader reader;
+    private transient String cacheKey;
+    private int numBloomFilters;
+    private transient BloomFilter[] bloomFilters;
+
+    public POBloomFilterRearrangeTez(POLocalRearrangeTez lr, int numBloomFilters) {
+        super(lr);
+        this.numBloomFilters = numBloomFilters;
+    }
+
+    public void setInputKey(String inputKey) {
+        this.inputKey = inputKey;
+    }
+
+    @Override
+    public String[] getTezInputs() {
+        return new String[] { inputKey };
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        if (oldInputKey.equals(inputKey)) {
+            inputKey = newInputKey;
+        }
+    }
+
+    @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+        cacheKey = "bloom-" + inputKey;
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            inputsToSkip.add(inputKey);
+        }
+    }
+
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs,
+            Configuration conf) throws ExecException {
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            bloomFilters = (BloomFilter[]) cacheValue;
+            return;
+        }
+        LogicalInput input = inputs.get(inputKey);
+        if (input == null) {
+            throw new ExecException("Input from vertex " + inputKey + " is missing");
+        }
+        try {
+            reader = (KeyValueReader) input.getReader();
+            LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+            while (reader.next()) {
+                if (bloomFilters == null) {
+                    bloomFilters = new BloomFilter[numBloomFilters];
+                }
+                Tuple val = (Tuple) reader.getCurrentValue();
+                int index = (int) val.get(0);
+                bloomFilters[index] = BuildBloomBase.bloomIn((DataByteArray) val.get(1));
+            }
+            ObjectCache.getInstance().cache(cacheKey, bloomFilters);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+
+        // If there is no bloom filter, then it means right input was empty
+        // Skip processing
+        if (bloomFilters == null) {
+            return RESULT_EOP;
+        }
+
+        while (true) {
+            res = super.getRearrangedTuple();
+            try {
+                switch (res.returnStatus) {
+                case POStatus.STATUS_OK:
+                    if (illustrator == null) {
+                        Tuple result = (Tuple) res.result;
+                        Byte index = (Byte) result.get(0);
+
+                        // Skip the record if key is not in the bloom filter
+                        if (!isKeyInBloomFilter(result.get(1))) {
+                            continue;
+                        }
+                        PigNullableWritable key = HDataType.getWritableComparableTypes(result.get(1), keyType);
+                        NullableTuple val = new NullableTuple((Tuple)result.get(2));
+                        key.setIndex(index);
+                        val.setIndex(index);
+                        writer.write(key, val);
+                    } else {
+                        illustratorMarkup(res.result, res.result, 0);
+                    }
+                    continue;
+                case POStatus.STATUS_NULL:
+                    continue;
+                case POStatus.STATUS_EOP:
+                case POStatus.STATUS_ERR:
+                default:
+                    return res;
+                }
+            } catch (IOException ioe) {
+                int errCode = 2135;
+                String msg = "Received error from POBloomFilterRearrage function." + ioe.getMessage();
+                throw new ExecException(msg, errCode, ioe);
+            }
+        }
+    }
+
+    private boolean isKeyInBloomFilter(Object key) throws ExecException {
+        if (key == null) {
+            // Null values are dropped in a inner join and in the case of outer join,
+            // POBloomFilterRearrangeTez is only in the plan on the non outer relation.
+            // So just skip them
+            return false;
+        }
+        if (bloomFilters.length == 1) {
+            // Skip computing hashcode
+            Key k = new Key(DataType.toBytes(key, keyType));
+            return bloomFilters[0].membershipTest(k);
+        } else {
+            int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+            BloomFilter filter = bloomFilters[partition];
+            if (filter != null) {
+                Key k = new Key(DataType.toBytes(key, keyType));
+                return filter.membershipTest(k);
+            }
+            return false;
+        }
+    }
+
+    @Override
+    public POBloomFilterRearrangeTez clone() throws CloneNotSupportedException {
+        return (POBloomFilterRearrangeTez) super.clone();
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "BloomFilter Rearrange" + "["
+                + DataType.findTypeName(resultType) + "]" + "{"
+                + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct
+                + ") - " + mKey.toString() + "\t<-\t " + inputKey + "\t->\t " + outputKey;
+    }
+
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableIntWritable;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+/**
+ * This operator writes out the key value for the hash join reduce operation similar to POLocalRearrangeTez.
+ * In addition, it also writes out the bloom filter constructed from the join keys
+ * in the case of bloomjoin map strategy or join keys themselves in case of reduce strategy.
+ *
+ * Using multiple bloom filters partitioned by the hash of the key allows for parallelism.
+ * It also allows us to have lower false positives with smaller vector sizes.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class POBuildBloomRearrangeTez extends POLocalRearrangeTez {
+    private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(POBuildBloomRearrangeTez.class);
+
+    public static final String DEFAULT_BLOOM_STRATEGY = "map";
+    public static final int DEFAULT_NUM_BLOOM_FILTERS_REDUCE = 11;
+    public static final int DEFAULT_NUM_BLOOM_HASH_FUNCTIONS = 3;
+    public static final String DEFAULT_BLOOM_HASH_TYPE = "murmur";
+    public static final int DEFAULT_BLOOM_VECTOR_SIZE_BYTES = 1024 * 1024;
+
+    private String bloomOutputKey;
+    private boolean skipNullKeys = false;
+    private boolean createBloomInMap;
+    private int numBloomFilters;
+    private int vectorSizeBytes;
+    private int numHash;
+    private int hashType;
+
+    private transient BloomFilter[] bloomFilters;
+    private transient KeyValueWriter bloomWriter;
+    private transient PigNullableWritable nullKey;
+    private transient Tuple bloomValue;
+    private transient NullableTuple bloomNullableTuple;
+
+    public POBuildBloomRearrangeTez(POLocalRearrangeTez lr,
+            boolean createBloomInMap, int numBloomFilters, int vectorSizeBytes,
+            int numHash, int hashType) {
+        super(lr);
+        this.createBloomInMap = createBloomInMap;
+        this.numBloomFilters = numBloomFilters;
+        this.vectorSizeBytes = vectorSizeBytes;
+        this.numHash = numHash;
+        this.hashType = hashType;
+    }
+
+    public static int getNumBloomFilters(Configuration conf) {
+        if ("map".equals(conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, DEFAULT_BLOOM_STRATEGY))) {
+            return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, 1);
+        } else {
+            return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, DEFAULT_NUM_BLOOM_FILTERS_REDUCE);
+        }
+    }
+
+    public void setSkipNullKeys(boolean skipNullKeys) {
+        this.skipNullKeys = skipNullKeys;
+    }
+
+    public void setBloomOutputKey(String bloomOutputKey) {
+        this.bloomOutputKey = bloomOutputKey;
+    }
+
+    @Override
+    public boolean containsOutputKey(String key) {
+        if(super.containsOutputKey(key)) {
+            return true;
+        }
+        return bloomOutputKey.equals(key);
+    }
+
+    @Override
+    public String[] getTezOutputs() {
+        return new String[] { outputKey, bloomOutputKey };
+    }
+
+    @Override
+    public void replaceOutput(String oldOutputKey, String newOutputKey) {
+        if (oldOutputKey.equals(outputKey)) {
+            outputKey = newOutputKey;
+        } else if (oldOutputKey.equals(bloomOutputKey)) {
+            bloomOutputKey = newOutputKey;
+        }
+    }
+
+    @Override
+    public void attachOutputs(Map<String, LogicalOutput> outputs,
+            Configuration conf) throws ExecException {
+        super.attachOutputs(outputs, conf);
+        LogicalOutput output = outputs.get(bloomOutputKey);
+        if (output == null) {
+            throw new ExecException("Output to vertex " + bloomOutputKey + " is missing");
+        }
+        try {
+            bloomWriter = (KeyValueWriter) output.getWriter();
+            LOG.info("Attached output to vertex " + bloomOutputKey + " : output=" + output + ", writer=" + bloomWriter);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+        bloomFilters = new BloomFilter[numBloomFilters];
+        bloomValue = mTupleFactory.newTuple(1);
+        bloomNullableTuple = new NullableTuple(bloomValue);
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+
+        PigNullableWritable key;
+        while (true) {
+            res = super.getRearrangedTuple();
+            try {
+                switch (res.returnStatus) {
+                case POStatus.STATUS_OK:
+                    if (illustrator == null) {
+                        Tuple result = (Tuple) res.result;
+                        Byte index = (Byte) result.get(0);
+
+                        Object keyObj = result.get(1);
+                        if (keyObj != null) {
+                            key = HDataType.getWritableComparableTypes(keyObj, keyType);
+                            // null keys cannot be part of bloom filter
+                            // Since they are also dropped during join we can skip them
+                            if (createBloomInMap) {
+                                addKeyToBloomFilter(keyObj);
+                            } else {
+                                writeJoinKeyForBloom(keyObj);
+                            }
+                        } else if (skipNullKeys) {
+                            // Inner join. So don't bother writing null key
+                            continue;
+                        } else {
+                            if (nullKey == null) {
+                                nullKey = HDataType.getWritableComparableTypes(keyObj, keyType);
+                            }
+                            key = nullKey;
+                        }
+
+                        NullableTuple val = new NullableTuple((Tuple)result.get(2));
+                        key.setIndex(index);
+                        val.setIndex(index);
+                        writer.write(key, val);
+                    } else {
+                        illustratorMarkup(res.result, res.result, 0);
+                    }
+                    continue;
+                case POStatus.STATUS_NULL:
+                    continue;
+                case POStatus.STATUS_EOP:
+                    if (this.parentPlan.endOfAllInput && createBloomInMap) {
+                        // In case of Split will get EOP after every record.
+                        // So check for endOfAllInput
+                        writeBloomFilters();
+                    }
+                case POStatus.STATUS_ERR:
+                default:
+                    return res;
+                }
+            } catch (IOException ioe) {
+                int errCode = 2135;
+                String msg = "Received error from POBuildBloomRearrage function." + ioe.getMessage();
+                throw new ExecException(msg, errCode, ioe);
+            }
+        }
+    }
+
+    private void addKeyToBloomFilter(Object key) throws ExecException {
+        Key k = new Key(DataType.toBytes(key, keyType));
+        if (bloomFilters.length == 1) {
+            if (bloomFilters[0] == null) {
+                bloomFilters[0] = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+            }
+            bloomFilters[0].add(k);
+        } else {
+            int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+            BloomFilter filter = bloomFilters[partition];
+            if (filter == null) {
+                filter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+                bloomFilters[partition] = filter;
+            }
+            filter.add(k);
+        }
+    }
+
+    private void writeJoinKeyForBloom(Object key) throws IOException {
+        int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+        bloomValue.set(0, key);
+        bloomWriter.write(new NullableIntWritable(partition), bloomNullableTuple);
+    }
+
+    private void writeBloomFilters() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(vectorSizeBytes + 64);
+        for (int i = 0; i < bloomFilters.length; i++) {
+            if (bloomFilters[i] != null) {
+                DataOutputStream dos = new DataOutputStream(baos);
+                bloomFilters[i].write(dos);
+                dos.flush();
+                bloomValue.set(0, new DataByteArray(baos.toByteArray()));
+                bloomWriter.write(new NullableIntWritable(i), bloomNullableTuple);
+                baos.reset();
+            }
+        }
+    }
+
+    @Override
+    public POBuildBloomRearrangeTez clone() throws CloneNotSupportedException {
+        return (POBuildBloomRearrangeTez) super.clone();
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "BuildBloom Rearrange" + "["
+                + DataType.findTypeName(resultType) + "]" + "{"
+                + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct
+                + ") - " + mKey.toString() + "\t->\t[ " + outputKey + ", " + bloomOutputKey +"]";
+    }
+
+}

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java Fri Feb 24 08:19:42 2017
@@ -56,6 +56,7 @@ public class POCounterStatsTez extends P
     private transient KeyValuesReader reader;
     private transient KeyValueWriter writer;
     private transient boolean finished = false;
+    private transient boolean hasNext = false;
 
     public POCounterStatsTez(OperatorKey k) {
         super(k);
@@ -88,6 +89,7 @@ public class POCounterStatsTez extends P
         try {
             reader = (KeyValuesReader) input.getReader();
             LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+            hasNext = reader.next();
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -130,12 +132,13 @@ public class POCounterStatsTez extends P
             Integer key = null;
             Long value = null;
             // Read count of records per task
-            while (reader.next()) {
+            while (hasNext) {
                 key = ((IntWritable)reader.getCurrentKey()).get();
                 for (Object val : reader.getCurrentValues()) {
                     value = ((LongWritable)val).get();
                     counterRecords.put(key, value);
                 }
+                hasNext = reader.next();
             }
 
             // BinInterSedes only takes String for map key

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java Fri Feb 24 08:19:42 2017
@@ -19,6 +19,8 @@
 package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -101,9 +103,13 @@ public class POFRJoinTez extends POFRJoi
                 LogicalInput input = inputs.get(key);
                 if (!this.replInputs.contains(input)) {
                     this.replInputs.add(input);
-                    this.replReaders.add((KeyValueReader) input.getReader());
+                    KeyValueReader reader = (KeyValueReader) input.getReader();
+                    this.replReaders.add(reader);
+                    log.info("Attached input from vertex " + key + " : input=" + input + ", reader=" + reader);
                 }
             }
+            // Do not force fetch input by reading first record. Cases like MultiQuery_Union_4 have
+            // multiple POFRJoinTez loading same replicate input and will skip records
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -114,6 +120,7 @@ public class POFRJoinTez extends POFRJoi
      *
      * @throws ExecException
      */
+    @SuppressWarnings("unchecked")
     @Override
     protected void setUpHashMap() throws ExecException {
 
@@ -121,8 +128,8 @@ public class POFRJoinTez extends POFRJoi
         // where same POFRJoinTez occurs in different Split sub-plans
         Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
         if (cacheValue != null) {
-            replicates = (TupleToMapKey[]) cacheValue;
-            log.info("Found " + (replicates.length - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey);
+            replicates =  (List<Map<? extends Object, ? extends List<Tuple>>>) cacheValue;
+            log.info("Found " + (replicates.size() - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey);
             return;
         }
 
@@ -148,7 +155,7 @@ public class POFRJoinTez extends POFRJoi
 
         long time1 = System.currentTimeMillis();
 
-        replicates[fragment] = null;
+        replicates.set(fragment, null);
         int inputIdx = 0;
         // We need to adjust the index because the number of replInputs is
         // one less than the number of inputSchemas. The inputSchemas
@@ -158,7 +165,12 @@ public class POFRJoinTez extends POFRJoi
             SchemaTupleFactory inputSchemaTupleFactory = inputSchemaTupleFactories[schemaIdx];
             SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[schemaIdx];
 
-            TupleToMapKey replicate = new TupleToMapKey(4000, keySchemaTupleFactory);
+            Map<Object, ArrayList<Tuple>> replicate;
+            if (keySchemaTupleFactory == null) {
+                replicate = new HashMap<Object, ArrayList<Tuple>>(4000);
+            } else {
+                replicate = new TupleToMapKey(4000, keySchemaTupleFactory);
+            }
             POLocalRearrange lr = LRs[schemaIdx];
 
             try {
@@ -168,7 +180,8 @@ public class POFRJoinTez extends POFRJoi
                     }
 
                     PigNullableWritable key = (PigNullableWritable) replReaders.get(inputIdx).getCurrentKey();
-                    if (isKeyNull(key.getValueAsPigType())) continue;
+                    Object keyValue = key.getValueAsPigType();
+                    if (isKeyNull(keyValue)) continue;
                     NullableTuple val = (NullableTuple) replReaders.get(inputIdx).getCurrentValue();
 
                     // POFRJoin#getValueTuple() is reused to construct valTuple,
@@ -176,27 +189,31 @@ public class POFRJoinTez extends POFRJoi
                     // construct one here.
                     Tuple retTuple = mTupleFactory.newTuple(3);
                     retTuple.set(0, key.getIndex());
-                    retTuple.set(1, key.getValueAsPigType());
+                    retTuple.set(1, keyValue);
                     retTuple.set(2, val.getValueAsPigType());
                     Tuple valTuple = getValueTuple(lr, retTuple);
 
-                    Tuple keyTuple = mTupleFactory.newTuple(1);
-                    keyTuple.set(0, key.getValueAsPigType());
-                    if (replicate.get(keyTuple) == null) {
-                        replicate.put(keyTuple, new TuplesToSchemaTupleList(1, inputSchemaTupleFactory));
+                    ArrayList<Tuple> values = replicate.get(keyValue);
+                    if (values == null) {
+                        if (inputSchemaTupleFactory == null) {
+                            values = new ArrayList<Tuple>(1);
+                        } else {
+                            values = new TuplesToSchemaTupleList(1, inputSchemaTupleFactory);
+                        }
+                        replicate.put(keyValue, values);
                     }
-                    replicate.get(keyTuple).add(valTuple);
+                    values.add(valTuple);
                 }
             } catch (IOException e) {
                 throw new ExecException(e);
             }
-            replicates[schemaIdx] = replicate;
+            replicates.set(schemaIdx, replicate);
             inputIdx++;
             schemaIdx++;
         }
 
         long time2 = System.currentTimeMillis();
-        log.info((replicates.length - 1) + " replication hash tables built. Time taken: " + (time2 - time1));
+        log.info((replicates.size() - 1) + " replication hash tables built. Time taken: " + (time2 - time1));
 
         ObjectCache.getInstance().cache(cacheKey, replicates);
         log.info("Cached replicate hash tables in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java Fri Feb 24 08:19:42 2017
@@ -57,6 +57,7 @@ public class POIdentityInOutTez extends
     private transient KeyValuesReader shuffleReader;
     private transient boolean shuffleInput;
     private transient boolean finished = false;
+    private transient boolean hasNext = false;
 
     public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange, String inputKey) {
         super(inputRearrange);
@@ -95,9 +96,12 @@ public class POIdentityInOutTez extends
             Reader r = input.getReader();
             if (r instanceof KeyValueReader) {
                 reader = (KeyValueReader) r;
+                // Force input fetch
+                hasNext = reader.next();
             } else {
                 shuffleInput = true;
                 shuffleReader = (KeyValuesReader) r;
+                hasNext = shuffleReader.next();
             }
             LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + r);
         } catch (Exception e) {
@@ -127,7 +131,7 @@ public class POIdentityInOutTez extends
                 return RESULT_EOP;
             }
             if (shuffleInput) {
-                while (shuffleReader.next()) {
+                while (hasNext) {
                     Object curKey = shuffleReader.getCurrentKey();
                     Iterable<Object> vals = shuffleReader.getCurrentValues();
                     if (isSkewedJoin) {
@@ -139,9 +143,10 @@ public class POIdentityInOutTez extends
                     for (Object val : vals) {
                         writer.write(curKey, val);
                     }
+                    hasNext = shuffleReader.next();
                 }
             } else {
-                while (reader.next()) {
+                while (hasNext) {
                     if (isSkewedJoin) {
                         NullablePartitionWritable wrappedKey = new NullablePartitionWritable(
                                 (PigNullableWritable) reader.getCurrentKey());
@@ -155,6 +160,7 @@ public class POIdentityInOutTez extends
                         writer.write(reader.getCurrentKey(),
                                 reader.getCurrentValue());
                     }
+                    hasNext = reader.next();
                 }
             }
             finished = true;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java Fri Feb 24 08:19:42 2017
@@ -71,8 +71,8 @@ public class POLocalRearrangeTez extends
         }
     }
 
-    public String getOutputKey() {
-        return outputKey;
+    public boolean containsOutputKey(String key) {
+        return outputKey.equals(key);
     }
 
     public void setOutputKey(String outputKey) {
@@ -122,6 +122,10 @@ public class POLocalRearrangeTez extends
         }
     }
 
+    protected Result getRearrangedTuple() throws ExecException {
+        return super.getNextTuple();
+    }
+
     @Override
     public Result getNextTuple() throws ExecException {
         res = super.getNextTuple();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java Fri Feb 24 08:19:42 2017
@@ -51,6 +51,7 @@ public class PORankTez extends PORank im
     private transient Map<Integer, Long> counterOffsets;
     private transient Configuration conf;
     private transient boolean finished = false;
+    private transient Boolean hasFirstRecord;
 
     public PORankTez(PORank copy) {
         super(copy);
@@ -100,6 +101,7 @@ public class PORankTez extends PORank im
         try {
             reader = (KeyValueReader) input.getReader();
             LOG.info("Attached input from vertex " + tuplesInputKey + " : input=" + input + ", reader=" + reader);
+            hasFirstRecord = reader.next();
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -140,9 +142,18 @@ public class PORankTez extends PORank im
         Result inp = null;
 
         try {
-            while (reader.next()) {
-                inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
-                return addRank(inp);
+            if (hasFirstRecord != null) {
+                if (hasFirstRecord) {
+                    hasFirstRecord = null;
+                    inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
+                    return addRank(inp);
+                }
+                hasFirstRecord = null;
+            } else {
+                while (reader.next()) {
+                    inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
+                    return addRank(inp);
+                }
             }
         } catch (IOException e) {
             throw new ExecException(e);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java Fri Feb 24 08:19:42 2017
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -32,12 +34,16 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
 import org.apache.pig.data.AccumulativeBag;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.ReadOnceBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -48,6 +54,7 @@ import org.apache.tez.runtime.library.co
 public class POShuffleTezLoad extends POPackage implements TezInput {
 
     private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(POShuffleTezLoad.class);
 
     protected List<String> inputKeys = new ArrayList<String>();
     private boolean isSkewedJoin = false;
@@ -61,6 +68,7 @@ public class POShuffleTezLoad extends PO
     private transient WritableComparator groupingComparator = null;
     private transient Configuration conf;
     private transient int accumulativeBatchSize;
+    private transient boolean readOnceOneBag;
 
     public POShuffleTezLoad(POPackage pack) {
         super(pack);
@@ -101,7 +109,10 @@ public class POShuffleTezLoad extends PO
                 //     - Input key will be repeated, but index would be same within a TezInput
                 if (!this.inputs.contains(input)) {
                     this.inputs.add(input);
-                    this.readers.add((KeyValuesReader)input.getReader());
+                    KeyValuesReader reader = (KeyValuesReader)input.getReader();
+                    this.readers.add(reader);
+                    LOG.info("Attached input from vertex " + inputKey
+                            + " : input=" + input + ", reader=" + reader);
                 }
             }
 
@@ -117,6 +128,13 @@ public class POShuffleTezLoad extends PO
             for (int i = 0; i < numTezInputs; i++) {
                 finished[i] = !readers.get(i).next();
             }
+
+            this.readOnceOneBag = (numInputs == 1)
+                    && (pkgr instanceof CombinerPackager
+                            || pkgr instanceof LitePackager || pkgr instanceof BloomPackager);
+            if (readOnceOneBag) {
+                readOnce[0] = true;
+            }
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -187,43 +205,47 @@ public class POShuffleTezLoad extends PO
 
                 } else {
 
-                    for (int i = 0; i < numInputs; i++) {
-                        bags[i] = new InternalCachedBag(numInputs);
-                    }
-
-                    if (numTezInputs == 1) {
-                        do {
-                            Iterable<Object> vals = readers.get(0).getCurrentValues();
-                            for (Object val : vals) {
-                                NullableTuple nTup = (NullableTuple) val;
-                                int index = nTup.getIndex();
-                                Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
-                                bags[index].add(tup);
-                            }
-                            finished[0] = !readers.get(0).next();
-                            if (finished[0]) {
-                                break;
-                            }
-                            cur = readers.get(0).getCurrentKey();
-                        } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators
+                    if (readOnceOneBag) {
+                        bags[0] = new TezReadOnceBag(pkgr, min);
                     } else {
-                        for (int i = 0; i < numTezInputs; i++) {
-                            if (!finished[i]) {
-                                cur = readers.get(i).getCurrentKey();
-                                // We need to loop in case of Grouping Comparators
-                                while (groupingComparator.compare(min, cur) == 0) {
-                                    Iterable<Object> vals = readers.get(i).getCurrentValues();
-                                    for (Object val : vals) {
-                                        NullableTuple nTup = (NullableTuple) val;
-                                        int index = nTup.getIndex();
-                                        Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
-                                        bags[index].add(tup);
-                                    }
-                                    finished[i] = !readers.get(i).next();
-                                    if (finished[i]) {
-                                        break;
-                                    }
+                        for (int i = 0; i < numInputs; i++) {
+                            bags[i] = new InternalCachedBag(numInputs);
+                        }
+
+                        if (numTezInputs == 1) {
+                            do {
+                                Iterable<Object> vals = readers.get(0).getCurrentValues();
+                                for (Object val : vals) {
+                                    NullableTuple nTup = (NullableTuple) val;
+                                    int index = nTup.getIndex();
+                                    Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+                                    bags[index].add(tup);
+                                }
+                                finished[0] = !readers.get(0).next();
+                                if (finished[0]) {
+                                    break;
+                                }
+                                cur = readers.get(0).getCurrentKey();
+                            } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators
+                        } else {
+                            for (int i = 0; i < numTezInputs; i++) {
+                                if (!finished[i]) {
                                     cur = readers.get(i).getCurrentKey();
+                                    // We need to loop in case of Grouping Comparators
+                                    while (groupingComparator.compare(min, cur) == 0) {
+                                        Iterable<Object> vals = readers.get(i).getCurrentValues();
+                                        for (Object val : vals) {
+                                            NullableTuple nTup = (NullableTuple) val;
+                                            int index = nTup.getIndex();
+                                            Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+                                            bags[index].add(tup);
+                                        }
+                                        finished[i] = !readers.get(i).next();
+                                        if (finished[i]) {
+                                            break;
+                                        }
+                                        cur = readers.get(i).getCurrentKey();
+                                    }
                                 }
                             }
                         }
@@ -383,4 +405,74 @@ public class POShuffleTezLoad extends PO
 
     }
 
+    private class TezReadOnceBag extends ReadOnceBag {
+
+        private static final long serialVersionUID = 1L;
+        private Iterator<Object> iter;
+
+        public TezReadOnceBag(Packager pkgr,
+                PigNullableWritable currentKey) throws IOException {
+            this.pkgr = pkgr;
+            this.keyWritable = currentKey;
+            this.iter = readers.get(0).getCurrentValues().iterator();
+        }
+
+        @Override
+        public Iterator<Tuple> iterator() {
+            return new TezReadOnceBagIterator();
+        }
+
+        private class TezReadOnceBagIterator implements Iterator<Tuple> {
+
+            @Override
+            public boolean hasNext() {
+                if (iter.hasNext()) {
+                    return true;
+                } else {
+                    try {
+                        finished[0] = !readers.get(0).next();
+                        if (finished[0]) {
+                            return false;
+                        }
+                        // Currently combiner is not being applied when secondary key(grouping comparator) is used
+                        // But might change in future. So check if the next key is same and return its values
+                        Object cur = readers.get(0).getCurrentKey();
+                        if (groupingComparator.compare(keyWritable, cur) == 0) {
+                            iter = readers.get(0).getCurrentValues().iterator();
+                            // Key should at least have one value. But doing a check just for safety
+                            if (iter.hasNext()) {
+                                return true;
+                            } else {
+                                throw new RuntimeException("Unexpected. Key " + keyWritable + " does not have any values");
+                            }
+                        }
+                        return false;
+                    } catch (IOException e) {
+                        throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e);
+                    }
+                }
+            }
+
+            @Override
+            public Tuple next() {
+                NullableTuple ntup = (NullableTuple) iter.next();
+                int index = ntup.getIndex();
+                Tuple ret = null;
+                try {
+                    ret = pkgr.getValueTuple(keyWritable, ntup, index);
+                } catch (ExecException e) {
+                    throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e);
+                }
+                return ret;
+            }
+
+            @Override
+            public void remove() {
+                throw new RuntimeException("ReadOnceBag.iterator().remove() is not allowed");
+            }
+        }
+
+    }
+
+
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java Fri Feb 24 08:19:42 2017
@@ -57,6 +57,7 @@ public class POShuffledValueInputTez ext
     private transient Iterator<KeyValueReader> readers;
     private transient KeyValueReader currentReader;
     private transient Configuration conf;
+    private transient Boolean hasFirstRecord;
 
     public POShuffledValueInputTez(OperatorKey k) {
         super(k);
@@ -98,6 +99,8 @@ public class POShuffledValueInputTez ext
             }
             readers = readersList.iterator();
             currentReader = readers.next();
+            // Force input fetch
+            hasFirstRecord = currentReader.next();
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -111,7 +114,15 @@ public class POShuffledValueInputTez ext
             }
 
             do {
-                if (currentReader.next()) {
+                if (hasFirstRecord != null) {
+                    if (hasFirstRecord) {
+                        hasFirstRecord = null;
+                        Tuple origTuple = (Tuple) currentReader.getCurrentValue();
+                        Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+                        return new Result(POStatus.STATUS_OK, copy);
+                    }
+                    hasFirstRecord = null;
+                } else if (currentReader.next()) {
                     Tuple origTuple = (Tuple) currentReader.getCurrentValue();
                     Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
                     return new Result(POStatus.STATUS_OK, copy);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java Fri Feb 24 08:19:42 2017
@@ -60,6 +60,8 @@ public class POSimpleTezLoad extends POL
     private transient Configuration conf;
     private transient boolean finished = false;
     private transient TezCounter inputRecordCounter;
+    private transient boolean initialized;
+    private transient boolean noTupleCopy;
 
     public POSimpleTezLoad(OperatorKey k, LoadFunc loader) {
         super(k, loader);
@@ -149,7 +151,13 @@ public class POSimpleTezLoad extends POL
             } else {
                 Result res = new Result();
                 Tuple next = (Tuple) reader.getCurrentValue();
-                res.result = next;
+                if (!initialized) {
+                    noTupleCopy = mTupleFactory.newTuple(1).getClass().isInstance(next);
+                    initialized = true;
+                }
+                // Some Loaders return implementations of DefaultTuple instead of BinSedesTuple
+                // In that case copy to BinSedesTuple
+                res.result = noTupleCopy ? next : mTupleFactory.newTupleNoCopy(next.getAll());
                 res.returnStatus = POStatus.STATUS_OK;
                 if (inputRecordCounter != null) {
                     inputRecordCounter.increment(1);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java Fri Feb 24 08:19:42 2017
@@ -102,19 +102,19 @@ public class POStoreTez extends POStore
             throw new ExecException(e);
         }
 
-        // Multiple outputs - can be another store or other outputs (shuffle, broadcast)
-        if (outputs.size() > 1) {
-            CounterGroup multiStoreGroup = processorContext.getCounters()
-                    .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
-            if (multiStoreGroup == null) {
-                processorContext.getCounters().addGroup(
-                        MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
-                        MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
-            }
-            String name = MRPigStatsUtil.getMultiStoreCounterName(this);
-            if (name != null) {
-                outputRecordCounter = multiStoreGroup.addCounter(name, name, 0);
-            }
+        // Even if there is a single hdfs output, we add multi store counter
+        // Makes it easier for user to see records for a particular store from
+        // the DAG counter
+        CounterGroup multiStoreGroup = processorContext.getCounters()
+                .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+        if (multiStoreGroup == null) {
+            processorContext.getCounters().addGroup(
+                    MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
+                    MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+        }
+        String name = MRPigStatsUtil.getMultiStoreCounterName(this);
+        if (name != null) {
+            outputRecordCounter = multiStoreGroup.addCounter(name, name, 0);
         }
     }
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java Fri Feb 24 08:19:42 2017
@@ -57,6 +57,7 @@ public class POValueInputTez extends Phy
     private transient KeyValuesReader shuffleReader;
     private transient boolean shuffleInput;
     private transient boolean hasNext;
+    private transient Boolean hasFirstRecord;
 
     public POValueInputTez(OperatorKey k) {
         super(k);
@@ -92,6 +93,8 @@ public class POValueInputTez extends Phy
             Reader r = input.getReader();
             if (r instanceof KeyValueReader) {
                 reader = (KeyValueReader) r;
+                // Force input fetch
+                hasFirstRecord = reader.next();
             } else {
                 shuffleInput = true;
                 shuffleReader = (KeyValuesReader) r;
@@ -118,10 +121,22 @@ public class POValueInputTez extends Phy
                     }
                     hasNext = shuffleReader.next();
                 }
-            } else if (reader.next()) {
-                Tuple origTuple = (Tuple) reader.getCurrentValue();
-                Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
-                return new Result(POStatus.STATUS_OK, copy);
+            } else {
+                if (hasFirstRecord != null) {
+                    if (hasFirstRecord) {
+                        hasFirstRecord = null;
+                        Tuple origTuple = (Tuple) reader.getCurrentValue();
+                        Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+                        return new Result(POStatus.STATUS_OK, copy);
+                    }
+                    hasFirstRecord = null;
+                } else {
+                    while (reader.next()) {
+                        Tuple origTuple = (Tuple) reader.getCurrentValue();
+                        Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+                        return new Result(POStatus.STATUS_OK, copy);
+                    }
+                }
             }
             finished = true;
             // For certain operators (such as STREAM), we could still have some work

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java Fri Feb 24 08:19:42 2017
@@ -69,6 +69,11 @@ public class CombinerOptimizer extends T
         }
 
         for (TezOperator from : predecessors) {
+            PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
+            if (!combinePlan.isEmpty()) {
+                // Cases like bloom join have combine plan already set
+                continue;
+            }
             List<POLocalRearrangeTez> rearranges = PlanHelper.getPhysicalOperators(from.plan, POLocalRearrangeTez.class);
             if (rearranges.isEmpty()) {
                 continue;
@@ -77,7 +82,7 @@ public class CombinerOptimizer extends T
             POLocalRearrangeTez connectingLR = null;
             PhysicalPlan rearrangePlan = from.plan;
             for (POLocalRearrangeTez lr : rearranges) {
-                if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+                if (lr.containsOutputKey(to.getOperatorKey().toString())) {
                     connectingLR = lr;
                     break;
                 }
@@ -90,7 +95,6 @@ public class CombinerOptimizer extends T
 
             // Detected the POLocalRearrange -> POPackage pattern. Let's add
             // combiner if possible.
-            PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
             CombinerOptimizerUtil.addCombiner(rearrangePlan, to.plan, combinePlan, messageCollector, doMapAgg);
 
             if(!combinePlan.isEmpty()) {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java Fri Feb 24 08:19:42 2017
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
@@ -65,11 +66,6 @@ public class LoaderProcessor extends Tez
         this.jobConf.setBoolean("mapred.mapper.new-api", true);
         this.jobConf.setClass("mapreduce.inputformat.class",
                 PigInputFormat.class, InputFormat.class);
-        try {
-            this.jobConf.set("pig.pigContext", ObjectSerializer.serialize(pc));
-        } catch (IOException e) {
-            throw new VisitorException(e);
-        }
     }
 
     /**
@@ -175,6 +171,7 @@ public class LoaderProcessor extends Tez
             // splits can be moved to if(loads) block below
             int parallelism = tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks();
             tezOp.setRequestedParallelism(parallelism);
+            tezOp.setTotalInputFilesSize(InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job));
         }
         return lds;
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java Fri Feb 24 08:19:42 2017
@@ -153,6 +153,8 @@ public class MultiQueryOptimizerTez exte
                     }
                 }
                 if (getPlan().getSuccessors(successor) != null) {
+                    nonPackageInputSuccessors.clear();
+                    toMergeSuccessors.clear();
                     for (TezOperator succSuccessor : getPlan().getSuccessors(successor)) {
                         if (succSuccessor.isUnion()) {
                             if (!(unionOptimizerOn &&
@@ -171,7 +173,13 @@ public class MultiQueryOptimizerTez exte
                                         continue;
                                     }
                                 }
-                                toMergeSuccessors.add(succSuccessor);
+                                if (TezCompilerUtil.isNonPackageInput(successor.getOperatorKey().toString(), succSuccessor)) {
+                                    // Output goes to scalar or POFRJoinTez in the union operator
+                                    // We need to ensure it is the only one to avoid parallel edges
+                                    canMerge = canMerge ? nonPackageInputSuccessors.add(succSuccessor) : false;
+                                } else {
+                                    toMergeSuccessors.add(succSuccessor);
+                                }
                                 List<TezOperator> unionSuccessors = getPlan().getSuccessors(succSuccessor);
                                 if (unionSuccessors != null) {
                                     for (TezOperator unionSuccessor : unionSuccessors) {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Fri Feb 24 08:19:42 2017
@@ -115,11 +115,16 @@ public class ParallelismSetter extends T
                     } else if (pc.defaultParallel != -1) {
                         parallelism = pc.defaultParallel;
                     }
+                    if (parallelism == 0) {
+                        // We need to produce empty output file.
+                        // Even if user set PARALLEL 0, mapreduce has 1 reducer
+                        parallelism = 1;
+                    }
                     boolean overrideRequestedParallelism = false;
                     if (parallelism != -1
                             && autoParallelismEnabled
-                            && tezOp.isIntermediateReducer()
                             && !tezOp.isDontEstimateParallelism()
+                            && tezOp.isIntermediateReducer()
                             && tezOp.isOverrideIntermediateParallelism()) {
                         overrideRequestedParallelism = true;
                     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java Fri Feb 24 08:19:42 2017
@@ -75,7 +75,7 @@ public class SecondaryKeyOptimizerTez ex
         POLocalRearrangeTez connectingLR = null;
         PhysicalPlan rearrangePlan = from.plan;
         for (POLocalRearrangeTez lr : rearranges) {
-            if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+            if (lr.containsOutputKey(to.getOperatorKey().toString())) {
                 connectingLR = lr;
                 break;
             }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java Fri Feb 24 08:19:42 2017
@@ -30,6 +30,8 @@ public class TezEstimatedParallelismClea
 
     @Override
     public void visitTezOp(TezOperator tezOp) throws VisitorException {
-        tezOp.setEstimatedParallelism(-1);
+        if (!tezOp.isDontEstimateParallelism()) {
+            tezOp.setEstimatedParallelism(-1);
+        }
     }
 }