You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/27 13:50:02 UTC

svn commit: r1642132 [5/14] - in /pig/branches/spark: ./ bin/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/datetime/convert/ contrib/piggybank/java/sr...

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java Thu Nov 27 12:49:54 2014
@@ -108,20 +108,22 @@ public class POLimit extends PhysicalOpe
             default:
                 throw new RuntimeException("Limit requires an integer parameter");
             }
-            if (variableLimit <= 0)
-                throw new RuntimeException("Limit requires a positive integer parameter");
+            if (variableLimit < 0)
+                throw new RuntimeException("Limit requires a zero or a positive integer parameter");
             this.setLimit(variableLimit);
         }
         Result inp = null;
         while (true) {
+            // illustrator ignore LIMIT before the post processing
+            if ((illustrator == null || illustrator.getOriginalLimit() != -1) && soFar >= mLimit) {
+                inp = RESULT_EOP;
+                break;
+            }
             inp = processInput();
             if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
                 break;
 
             illustratorMarkup(inp.result, null, 0);
-            // illustrator ignore LIMIT before the post processing
-            if ((illustrator == null || illustrator.getOriginalLimit() != -1) && soFar>=mLimit)
-            	inp.returnStatus = POStatus.STATUS_EOP;
 
             soFar++;
             break;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Thu Nov 27 12:49:54 2014
@@ -18,6 +18,7 @@
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -65,6 +66,9 @@ public class POLoad extends PhysicalOper
     private boolean isTmpLoad;
     
     private long limit=-1;
+
+    private transient List<String> cacheFiles = null;
+    private transient List<String> shipFiles = null;
     
     public POLoad(OperatorKey k) {
         this(k,-1, null);
@@ -252,4 +256,20 @@ public class POLoad extends PhysicalOper
     public void setLimit(long limit) {
         this.limit = limit;
     }
+
+    public List<String> getCacheFiles() {
+        return cacheFiles;
+    }
+
+    public void setCacheFiles(List<String> cf) {
+        cacheFiles = cf;
+    }
+
+    public List<String> getShipFiles() {
+        return shipFiles;
+    }
+
+    public void setShipFiles(List<String> sf) {
+        shipFiles = sf;
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Thu Nov 27 12:49:54 2014
@@ -22,11 +22,13 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
 import org.apache.pig.data.AccumulativeBag;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
@@ -75,16 +77,16 @@ public class POPackage extends PhysicalO
     protected static final BagFactory mBagFactory = BagFactory.getInstance();
     protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
 
-    private boolean firstTime = true;
-
-    private boolean useDefaultBag = false;
-
     private boolean lastBagReadOnly = true;
 
     protected Packager pkgr;
 
     protected PigNullableWritable keyWritable;
 
+    private transient boolean initialized;
+    private transient boolean useDefaultBag;
+    private transient int accumulativeBatchSize;
+
     public POPackage(OperatorKey k) {
         this(k, -1, null);
     }
@@ -189,15 +191,17 @@ public class POPackage extends PhysicalO
      */
     @Override
     public Result getNextTuple() throws ExecException {
-        if(firstTime){
-            firstTime = false;
+        if (!initialized) {
+            initialized = true;
             if (PigMapReduce.sJobConfInternal.get() != null) {
                 String bagType = PigMapReduce.sJobConfInternal.get().get(
-                        "pig.cachedbag.type");
+                        PigConfiguration.PIG_CACHEDBAG_TYPE);
                 if (bagType != null && bagType.equalsIgnoreCase("default")) {
                     useDefaultBag = true;
                 }
             }
+            accumulativeBatchSize = AccumulatorOptimizerUtil.getAccumulativeBatchSize();
+
             // If multiquery, the last bag is InternalCachedBag and should not
             // set ReadOnly flag, otherwise we will materialize again to another
             // InternalCachedBag
@@ -220,9 +224,7 @@ public class POPackage extends PhysicalO
                 // create bag wrapper to pull tuples in many batches
                 // all bags have reference to the sample tuples buffer
                 // which contains tuples from one batch
-                POPackageTupleBuffer buffer = new POPackageTupleBuffer();
-                buffer.setKey(key);
-                buffer.setIterator(tupIter);
+                POPackageTupleBuffer buffer = new POPackageTupleBuffer(accumulativeBatchSize, key, tupIter);
                 for (int i = 0; i < numInputs; i++) {
                     dbs[i] = new AccumulativeBag(buffer, i);
                 }
@@ -317,29 +319,16 @@ public class POPackage extends PhysicalO
         private Object currKey;
 
         @SuppressWarnings("unchecked")
-        public POPackageTupleBuffer() {
-            batchSize = 20000;
-            if (PigMapReduce.sJobConfInternal.get() != null) {
-                String size = PigMapReduce.sJobConfInternal.get().get("pig.accumulative.batchsize");
-                if (size != null) {
-                    batchSize = Integer.parseInt(size);
-                }
-            }
-
+        public POPackageTupleBuffer(int batchSize, Object key, Iterator<NullableTuple> iter) {
+            this.batchSize = batchSize;
+            this.currKey = key;
+            this.iter = iter;
             this.bags = new List[numInputs];
             for(int i=0; i<numInputs; i++) {
-                this.bags[i] = new ArrayList<Tuple>();
+                this.bags[i] = new ArrayList<Tuple>(batchSize);
             }
         }
 
-        public void setKey(Object key) {
-            this.currKey = key;
-        }
-
-        public void setIterator(Iterator<NullableTuple> iter) {
-            this.iter = iter;
-        }
-
         @Override
         public boolean hasNextBatch() {
             return iter.hasNext();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Thu Nov 27 12:49:54 2014
@@ -36,14 +36,15 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.SelfSpillBag.MemoryLimits;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.GroupingSpillable;
 import org.apache.pig.impl.util.Spillable;
 import org.apache.pig.impl.util.SpillableMemoryManager;
 
@@ -56,7 +57,7 @@ import com.google.common.collect.Maps;
  * map. Once that map fills up or all input has been seen, results are
  * piped out into the next operator (caller of getNext()).
  */
-public class POPartialAgg extends PhysicalOperator implements Spillable {
+public class POPartialAgg extends PhysicalOperator implements Spillable, GroupingSpillable {
     private static final Log LOG = LogFactory.getLog(POPartialAgg.class);
     private static final long serialVersionUID = 1L;
 
@@ -83,33 +84,44 @@ public class POPartialAgg extends Physic
     private static final WeakHashMap<POPartialAgg, Byte> ALL_POPARTS = new WeakHashMap<POPartialAgg, Byte>();
 
     private static final TupleFactory TF = TupleFactory.getInstance();
-    private static final BagFactory BG = BagFactory.getInstance();
 
     private PhysicalPlan keyPlan;
     private ExpressionOperator keyLeaf;
-
     private List<PhysicalPlan> valuePlans;
     private List<ExpressionOperator> valueLeaves;
 
-    private int numRecsInRawMap = 0;
-    private int numRecsInProcessedMap = 0;
+    private transient int numRecsInRawMap;
+    private transient int numRecsInProcessedMap;
 
-    private Map<Object, List<Tuple>> rawInputMap = Maps.newHashMap();
-    private Map<Object, List<Tuple>> processedInputMap = Maps.newHashMap();
+    private transient Map<Object, List<Tuple>> rawInputMap;
+    private transient Map<Object, List<Tuple>> processedInputMap;
 
-    private boolean disableMapAgg = false;
-    private boolean sizeReductionChecked = false;
-    private boolean inputsExhausted = false;
-    private volatile boolean doSpill = false;
-    private transient MemoryLimits memLimits;
-
-    private transient boolean initialized = false;
-    private int firstTierThreshold = FIRST_TIER_THRESHOLD;
-    private int secondTierThreshold = SECOND_TIER_THRESHOLD;
-    private int sizeReduction = 1;
-    private int avgTupleSize = 0;
-    private Iterator<Entry<Object, List<Tuple>>> spillingIterator;
-    private boolean estimatedMemThresholds = false;
+    //Transient booleans always initialize to false
+    private transient boolean initialized;
+    private transient boolean disableMapAgg;
+    private transient boolean sizeReductionChecked;
+    private transient boolean inputsExhausted;
+    private transient boolean estimatedMemThresholds;
+    // The doSpill flag is set when spilling is running or needs to run.
+    // It is set by POPartialAgg when its buffers are full after having run aggregations and
+    // the records have to be emitted to the map output.
+    // The doContingentSpill flag is set when the SpillableMemoryManager is notified
+    // by GC that the runtime is low on memory and the SpillableMemoryManager identifies
+    // the particular buffer as a good spill candidate because it is large. The contingent spill logic tries
+    // to satisfy the memory manager's request for freeing memory by aggregating data
+    // rather than just spilling records to disk.
+    private transient volatile boolean doSpill;
+    private transient volatile boolean doContingentSpill;
+    private transient volatile Object spillLock;
+
+    private transient int minOutputReduction;
+    private transient float percentUsage;
+    private transient int numRecordsToSample;
+    private transient int firstTierThreshold;
+    private transient int secondTierThreshold;
+    private transient int sizeReduction;
+    private transient int avgTupleSize;
+    private transient Iterator<Entry<Object, List<Tuple>>> spillingIterator;
 
 
     public POPartialAgg(OperatorKey k) {
@@ -118,10 +130,38 @@ public class POPartialAgg extends Physic
 
     private void init() throws ExecException {
         ALL_POPARTS.put(this, null);
-        float percent = getPercentUsageFromProp();
-        if (percent <= 0) {
+        numRecsInRawMap = 0;
+        numRecsInProcessedMap = 0;
+        rawInputMap = Maps.newHashMap();
+        processedInputMap = Maps.newHashMap();
+        minOutputReduction = DEFAULT_MIN_REDUCTION;
+        numRecordsToSample = NUM_RECS_TO_SAMPLE;
+        firstTierThreshold = FIRST_TIER_THRESHOLD;
+        secondTierThreshold = SECOND_TIER_THRESHOLD;
+        sizeReduction = 1;
+        avgTupleSize = 0;
+        percentUsage = 0.2F;
+        spillLock = new Object();
+        if (PigMapReduce.sJobConfInternal.get() != null) {
+            String usage = PigMapReduce.sJobConfInternal.get().get(
+                    PigConfiguration.PIG_CACHEDBAG_MEMUSAGE);
+            if (usage != null) {
+                percentUsage = Float.parseFloat(usage);
+            }
+            minOutputReduction = PigMapReduce.sJobConfInternal.get().getInt(
+                    PigConfiguration.PIG_EXEC_MAP_PARTAGG_MINREDUCTION, DEFAULT_MIN_REDUCTION);
+            if (minOutputReduction <= 0) {
+                LOG.info("Specified reduction is < 0 (" + minOutputReduction + "). Using default " +
+                        DEFAULT_MIN_REDUCTION);
+                minOutputReduction = DEFAULT_MIN_REDUCTION;
+            }
+        }
+        if (percentUsage <= 0) {
             LOG.info("No memory allocated to intermediate memory buffers. Turning off partial aggregation.");
             disableMapAgg();
+            // Set them to true instead of adding another check for !disableMapAgg
+            sizeReductionChecked = true;
+            estimatedMemThresholds = true;
         }
         initialized = true;
         SpillableMemoryManager.getInstance().registerSpillable(this);
@@ -145,17 +185,36 @@ public class POPartialAgg extends Physic
         }
 
         while (true) {
-            if (!sizeReductionChecked && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
+            if (!sizeReductionChecked && numRecsInRawMap >= numRecordsToSample) {
                 checkSizeReduction();
+                if (doContingentSpill && !doSpill) {
+                    LOG.info("Avoided emitting records during spill memory call.");
+                    doContingentSpill = false;
+                }
             }
-            if (!estimatedMemThresholds && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
+            if (!estimatedMemThresholds && numRecsInRawMap >= numRecordsToSample) {
                 estimateMemThresholds();
             }
+            if (doContingentSpill) {
+                // Don't aggregate if spilling. Avoid concurrent update of spilling iterator.
+                if (doSpill == false) {
+                    // SpillableMemoryManager requested a spill to reduce memory
+                    // consumption. See if we can avoid it.
+                    aggregateBothLevels(false, false);
+                    if (shouldSpill()) {
+                        startSpill(false);
+                    } else {
+                        LOG.info("Avoided emitting records during spill memory call.");
+                        doContingentSpill = false;
+                    }
+                }
+            }
             if (doSpill) {
-                startSpill();
+                startSpill(true);
                 Result result = spillResult();
                 if (result.returnStatus == POStatus.STATUS_EOP) {
                     doSpill = false;
+                    doContingentSpill = false;
                 }
                 if (result.returnStatus != POStatus.STATUS_EOP
                         || inputsExhausted) {
@@ -174,8 +233,8 @@ public class POPartialAgg extends Physic
                     if (parentPlan.endOfAllInput) {
                         // parent input is over. flush what we have.
                         inputsExhausted = true;
-                        startSpill();
                         LOG.info("Spilling last bits.");
+                        startSpill(true);
                         continue;
                     } else {
                         return EOP_RESULT;
@@ -197,15 +256,9 @@ public class POPartialAgg extends Physic
                     numRecsInRawMap += 1;
                     addKeyValToMap(rawInputMap, key, inpTuple);
 
-                    if (shouldAggregateFirstLevel()) {
-                        aggregateFirstLevel();
-                    }
-                    if (shouldAggregateSecondLevel()) {
-                        aggregateSecondLevel();
-                    }
+                    aggregateBothLevels(true, true);
                     if (shouldSpill()) {
-                        LOG.info("Starting spill.");
-                        startSpill(); // next time around, we'll start emitting.
+                        startSpill(false); // next time around, we'll start emitting.
                     }
                 }
             }
@@ -214,10 +267,10 @@ public class POPartialAgg extends Physic
 
     private void estimateMemThresholds() {
         if (!mapAggDisabled()) {
-            LOG.info("Getting mem limits; considering " + ALL_POPARTS.size() + " POPArtialAgg objects.");
-
-            float percent = getPercentUsageFromProp();
-            memLimits = new MemoryLimits(ALL_POPARTS.size(), percent);
+            LOG.info("Getting mem limits; considering " + ALL_POPARTS.size()
+                    + " POPArtialAgg objects." + " with memory percentage "
+                    + percentUsage);
+            MemoryLimits memLimits = new MemoryLimits(ALL_POPARTS.size(), percentUsage);
             int estTotalMem = 0;
             int estTuples = 0;
             for (Map.Entry<Object, List<Tuple>> entry : rawInputMap.entrySet()) {
@@ -234,30 +287,39 @@ public class POPartialAgg extends Physic
             firstTierThreshold = (int) (0.5 + totalTuples * (1f - (1f / sizeReduction)));
             secondTierThreshold = (int) (0.5 + totalTuples *  (1f / sizeReduction));
             LOG.info("Setting thresholds. Primary: " + firstTierThreshold + ". Secondary: " + secondTierThreshold);
+            // The second tier should at least allow one tuple before it tries to aggregate.
+            // This code retains the total number of tuples in the buffer while guaranteeing
+            // the second tier has at least one tuple.
+            if (secondTierThreshold == 0) {
+                secondTierThreshold += 1;
+                firstTierThreshold -= 1;
+            }
         }
         estimatedMemThresholds = true;
     }
 
     private void checkSizeReduction() throws ExecException {
-        int numBeforeReduction = numRecsInProcessedMap + numRecsInRawMap;
-        aggregateFirstLevel();
-        aggregateSecondLevel();
-        int numAfterReduction = numRecsInProcessedMap + numRecsInRawMap;
-        LOG.info("After reduction, processed map: " + numRecsInProcessedMap + "; raw map: " + numRecsInRawMap);
-        int minReduction = getMinOutputReductionFromProp();
-        LOG.info("Observed reduction factor: from " + numBeforeReduction +
-                " to " + numAfterReduction +
-                " => " + numBeforeReduction / numAfterReduction + ".");
-        if ( numBeforeReduction / numAfterReduction < minReduction) {
-            LOG.info("Disabling in-memory aggregation, since observed reduction is less than " + minReduction);
-            disableMapAgg();
+        if (!mapAggDisabled()) {
+            int numBeforeReduction = numRecsInProcessedMap + numRecsInRawMap;
+            aggregateBothLevels(false, false);
+            int numAfterReduction = numRecsInProcessedMap + numRecsInRawMap;
+            LOG.info("After reduction, processed map: " + numRecsInProcessedMap + "; raw map: " + numRecsInRawMap);
+            LOG.info("Observed reduction factor: from " + numBeforeReduction +
+                    " to " + numAfterReduction +
+                    " => " + numBeforeReduction / numAfterReduction + ".");
+            if ( numBeforeReduction / numAfterReduction < minOutputReduction) {
+                LOG.info("Disabling in-memory aggregation, since observed reduction is less than " + minOutputReduction);
+                disableMapAgg();
+            }
+            sizeReduction = numBeforeReduction / numAfterReduction;
+            sizeReductionChecked = true;
         }
-        sizeReduction = numBeforeReduction / numAfterReduction;
-        sizeReductionChecked = true;
 
     }
     private void disableMapAgg() throws ExecException {
-        startSpill();
+        // Do not aggregate as when disableMapAgg is called aggregation is
+        // called and size reduction checked
+        startSpill(false);
         disableMapAgg = true;
     }
 
@@ -266,16 +328,10 @@ public class POPartialAgg extends Physic
     }
 
     private boolean shouldAggregateFirstLevel() {
-        if (LOG.isInfoEnabled() && numRecsInRawMap > firstTierThreshold) {
-            LOG.info("Aggregating " + numRecsInRawMap + " raw records.");
-        }
         return (numRecsInRawMap > firstTierThreshold);
     }
 
     private boolean shouldAggregateSecondLevel() {
-        if (LOG.isInfoEnabled() && numRecsInProcessedMap > secondTierThreshold) {
-            LOG.info("Aggregating " + numRecsInProcessedMap + " secondary records.");
-        }
         return (numRecsInProcessedMap > secondTierThreshold);
     }
 
@@ -305,27 +361,13 @@ public class POPartialAgg extends Physic
         }
     }
 
-    private void startSpill() throws ExecException {
+    private void startSpill(boolean aggregate) throws ExecException {
         // If spillingIterator is null, we are already spilling and don't need to set up.
         if (spillingIterator != null) return;
 
-        if (!rawInputMap.isEmpty()) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("In startSpill(), aggregating raw inputs. " + numRecsInRawMap + " tuples.");
-            }
-            aggregateFirstLevel();
-            if (LOG.isInfoEnabled()) {
-                LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples.");
-            }
-        }
-        if (!processedInputMap.isEmpty()) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("In startSpill(), aggregating processed inputs. " + numRecsInProcessedMap + " tuples.");
-            }
-            aggregateSecondLevel();
-            if (LOG.isInfoEnabled()) {
-                LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples.");
-            }
+        LOG.info("Starting spill.");
+        if (aggregate) {
+            aggregateBothLevels(false, true);
         }
         doSpill = true;
         spillingIterator = processedInputMap.entrySet().iterator();
@@ -374,15 +416,41 @@ public class POPartialAgg extends Physic
         return numEntriesInTarget;
     }
 
+    private void aggregateBothLevels(boolean checkThresholdForFirst,
+            boolean checkThresholdForSecond) throws ExecException {
+        // When processed map is initially empty, just aggregate first level as
+        // aggregating second level immediately would not yield anything
+        boolean aggregateSecondLevel = !processedInputMap.isEmpty();
+        if (!checkThresholdForFirst || shouldAggregateFirstLevel()) {
+            aggregateFirstLevel();
+        }
+        if (aggregateSecondLevel && (!checkThresholdForSecond || shouldAggregateSecondLevel())) {
+            aggregateSecondLevel();
+        }
+    }
+
     private void aggregateFirstLevel() throws ExecException {
+        if (rawInputMap.isEmpty()) {
+            return;
+        }
+        int rawTuples = numRecsInRawMap;
+        int processedTuples = numRecsInProcessedMap;
         numRecsInProcessedMap = aggregate(rawInputMap, processedInputMap, numRecsInProcessedMap);
         numRecsInRawMap = 0;
+        LOG.info("Aggregated " + rawTuples+ " raw tuples."
+                + " Processed tuples before aggregation = " + processedTuples
+                + ", after aggregation = " + numRecsInProcessedMap);
     }
 
     private void aggregateSecondLevel() throws ExecException {
+        if (processedInputMap.isEmpty()) {
+            return;
+        }
+        int processedTuples = numRecsInProcessedMap;
         Map<Object, List<Tuple>> newMap = Maps.newHashMapWithExpectedSize(processedInputMap.size());
         numRecsInProcessedMap = aggregate(processedInputMap, newMap, 0);
         processedInputMap = newMap;
+        LOG.info("Aggregated " + processedTuples + " processed tuples to " + numRecsInProcessedMap + " tuples");
     }
 
     private Tuple createValueTuple(Object key, List<Tuple> inpTuples) throws ExecException {
@@ -390,7 +458,14 @@ public class POPartialAgg extends Physic
         valueTuple.set(0, key);
 
         for (int i = 0; i < valuePlans.size(); i++) {
-            DataBag bag = BG.newDefaultBag();
+            DataBag bag = null;
+            if (doContingentSpill) {
+                // Don't use additional memory since we already have memory stress
+                bag = new InternalCachedBag();
+            } else {
+                // Take 10% of memory, need fine tune later
+                bag = new InternalCachedBag(1, 0.1F);
+            }
             valueTuple.set(i + 1, bag);
         }
         for (Tuple t : inpTuples) {
@@ -424,29 +499,6 @@ public class POPartialAgg extends Physic
         v.visitPartialAgg(this);
     }
 
-    private int getMinOutputReductionFromProp() {
-        int minReduction = PigMapReduce.sJobConfInternal.get().getInt(
-                PigConfiguration.PARTAGG_MINREDUCTION, DEFAULT_MIN_REDUCTION);
-        if (minReduction <= 0) {
-            LOG.info("Specified reduction is < 0 (" + minReduction + "). Using default " + DEFAULT_MIN_REDUCTION);
-            minReduction = DEFAULT_MIN_REDUCTION;
-        }
-        return minReduction;
-    }
-
-    private float getPercentUsageFromProp() {
-        float percent = 0.2F;
-        if (PigMapReduce.sJobConfInternal.get() != null) {
-            String usage = PigMapReduce.sJobConfInternal.get().get(
-                    PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
-            if (usage != null) {
-                percent = Float.parseFloat(usage);
-            }
-        }
-        return percent;
-    }
-
-
     private Result getResult(ExpressionOperator op) throws ExecException {
         Result res;
         switch (op.getResultType()) {
@@ -536,9 +588,26 @@ public class POPartialAgg extends Physic
 
     @Override
     public long spill() {
-        LOG.info("Spill triggered by SpillableMemoryManager");
-        doSpill = true;
-        return 0;
+        if (mapAggDisabled()) {
+            return 0;
+        } else {
+            LOG.info("Spill triggered by SpillableMemoryManager");
+            doContingentSpill = true;
+            synchronized(spillLock) {
+                if (!sizeReductionChecked) {
+                    numRecordsToSample = numRecsInRawMap;
+                }
+                try {
+                    while (doContingentSpill == true) {
+                        Thread.sleep(50); //Keeping it on the lower side for now. Tune later
+                    }
+                } catch (InterruptedException e) {
+                    LOG.warn("Interrupted exception while waiting for spill to finish", e);
+                }
+                LOG.info("Finished spill for SpillableMemoryManager call");
+                return 1;
+            }
+        }
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Thu Nov 27 12:49:54 2014
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -77,7 +78,10 @@ public class POSort extends PhysicalOper
 	private long limit;
 	public boolean isUDFComparatorUsed = false;
 	private DataBag sortedBag;
-	transient Iterator<Tuple> it;
+
+    private transient Iterator<Tuple> it;
+    private transient boolean initialized;
+    private transient boolean useDefaultBag;
 
 	public POSort(
             OperatorKey k,
@@ -256,17 +260,19 @@ public class POSort extends PhysicalOper
 
 		if (!inputsAccumulated) {
 			res = processInput();
+            if (!initialized) {
+                initialized = true;
+                if (PigMapReduce.sJobConfInternal.get() != null) {
+                    String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_SORT_TYPE);
+                    if (bagType != null && bagType.equalsIgnoreCase("default")) {
+                        useDefaultBag = true;
+                    }
+                }
+            }
 			// by default, we create InternalSortedBag, unless user configures
-			// explicitly to use old bag
-			String bagType = null;
-	        if (PigMapReduce.sJobConfInternal.get() != null) {
-	   			bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.sort.type");
-	   	    }
-            if (bagType != null && bagType.equalsIgnoreCase("default")) {
-            	sortedBag = BagFactory.getInstance().newSortedBag(mComparator);
-       	    } else {
-    	    	sortedBag = new InternalSortedBag(3, mComparator);
-    	    }
+            // explicitly to use old bag
+            sortedBag = useDefaultBag ? BagFactory.getInstance().newSortedBag(mComparator)
+                    : new InternalSortedBag(3, mComparator);
 
             while (res.returnStatus != POStatus.STATUS_EOP) {
 				if (res.returnStatus == POStatus.STATUS_ERR) {
@@ -377,6 +383,7 @@ public class POSort extends PhysicalOper
     }
 
 
+    @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         if(illustrator != null) {
           illustrator.getEquivalenceClasses().get(eqClassIndex).add((Tuple) in);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Thu Nov 27 12:49:54 2014
@@ -81,6 +81,9 @@ public class POStore extends PhysicalOpe
 
     private String signature;
 
+    private transient List<String> cacheFiles = null;
+    private transient List<String> shipFiles = null;
+
     public POStore(OperatorKey k) {
         this(k, -1, null);
     }
@@ -313,4 +316,20 @@ public class POStore extends PhysicalOpe
     public void setStoreFunc(StoreFuncInterface storeFunc) {
         this.storer = storeFunc;
     }
+
+    public List<String> getCacheFiles() {
+        return cacheFiles;
+    }
+
+    public void setCacheFiles(List<String> cf) {
+        cacheFiles = cf;
+    }
+
+    public List<String> getShipFiles() {
+        return shipFiles;
+    }
+
+    public void setShipFiles(List<String> sf) {
+        shipFiles = sf;
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Thu Nov 27 12:49:54 2014
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
 
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -92,8 +93,8 @@ public class Packager implements Illustr
 
     private PackageType pkgType;
 
-    boolean firstTime = true;
-    boolean useDefaultBag = false;
+    private transient boolean initialized;
+    private transient boolean useDefaultBag;
 
     protected POPackage parent = null;
 
@@ -473,10 +474,10 @@ public class Packager implements Illustr
     }
 
     public void checkBagType() {
-        if(firstTime){
-            firstTime = false;
+        if(!initialized){
+            initialized = true;
             if (PigMapReduce.sJobConfInternal.get() != null) {
-                String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+                String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_TYPE);
                 if (bagType != null && bagType.equalsIgnoreCase("default")) {
                     useDefaultBag = true;
                 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Nov 27 12:49:54 2014
@@ -19,16 +19,20 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.WritableComparable;
@@ -89,11 +93,26 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezPOPackageAnnotator.LoRearrangeDiscoverer;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPOPackageAnnotator.LoRearrangeDiscoverer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POIdentityInOutTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POShuffleTezLoad;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueInputTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PartitionerDefinedVertexManager;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigOutputFormatTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
@@ -102,6 +121,7 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSinkDescriptor;
@@ -135,6 +155,9 @@ import org.apache.tez.runtime.library.ap
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
 import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
 import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput;
+import org.apache.tez.runtime.library.input.UnorderedKVInput;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
 
 /**
  * A visitor to construct DAG out of Tez plan.
@@ -146,6 +169,7 @@ public class TezDagBuilder extends TezOp
     private Map<String, LocalResource> localResources;
     private PigContext pc;
     private Configuration globalConf;
+    private long intermediateTaskInputSize;
 
     public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
             Map<String, LocalResource> localResources) {
@@ -162,6 +186,19 @@ public class TezDagBuilder extends TezOp
         } catch (IOException e) {
             throw new RuntimeException("Error while fetching delegation tokens", e);
         }
+
+        try {
+            intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(FileSystem.get(globalConf), FileLocalizer.getTemporaryResourcePath(pc));
+        } catch (Exception e) {
+            log.warn("Unable to get the block size for temporary directory, defaulting to 128MB", e);
+            intermediateTaskInputSize = 134217728L;
+        }
+        // At least 128MB. Else we will end up with too many tasks
+        intermediateTaskInputSize = Math.max(intermediateTaskInputSize, 134217728L);
+        intermediateTaskInputSize = Math.min(intermediateTaskInputSize,
+                globalConf.getLong(
+                        InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+                        InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
     }
 
     @Override
@@ -173,8 +210,7 @@ public class TezDagBuilder extends TezOp
         Vertex to = null;
         try {
             if (!tezOp.isVertexGroup()) {
-                boolean isMap = (predecessors == null || predecessors.isEmpty()) ? true : false;
-                to = newVertex(tezOp, isMap);
+                to = newVertex(tezOp);
                 dag.addVertex(to);
             } else {
                 // For union, we construct VertexGroup after iterating the
@@ -248,7 +284,8 @@ public class TezDagBuilder extends TezOp
         }
 
         return GroupInputEdge.create(from, to, edgeProperty,
-                InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()));
+                InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload())
+                    .setHistoryText(edgeProperty.getEdgeDestination().getHistoryText()));
     }
 
     /**
@@ -339,8 +376,9 @@ public class TezDagBuilder extends TezOp
 
         MRToTezHelper.processMRSettings(conf, globalConf);
 
-        in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-        out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+        String historyString = convertToHistoryText("", conf);
+        in.setUserPayload(TezUtils.createUserPayloadFromConf(conf)).setHistoryText(historyString);
+        out.setUserPayload(TezUtils.createUserPayloadFromConf(conf)).setHistoryText(historyString);
 
         if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
             // Use custom edge
@@ -378,7 +416,7 @@ public class TezDagBuilder extends TezOp
                 .serialize(new byte[] { combRearrange.getKeyType() }));
     }
 
-    private Vertex newVertex(TezOperator tezOp, boolean isMap) throws IOException,
+    private Vertex newVertex(TezOperator tezOp) throws IOException,
             ClassNotFoundException, InterruptedException {
         ProcessorDescriptor procDesc = ProcessorDescriptor.create(
                 tezOp.getProcessorName());
@@ -395,12 +433,24 @@ public class TezDagBuilder extends TezOp
         Job job = new Job(payloadConf);
         payloadConf = (JobConf) job.getConfiguration();
 
-        if (tezOp.sampleOperator != null) {
-            payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.sampleOperator.getOperatorKey().toString());
+        if (tezOp.getSampleOperator() != null) {
+            payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.getSampleOperator().getOperatorKey().toString());
         }
 
-        if (tezOp.sortOperator != null) {
-            payloadConf.set(PigProcessor.SORT_VERTEX, tezOp.sortOperator.getOperatorKey().toString());
+        if (tezOp.getSortOperator() != null) {
+            // Required by Sample Aggregation job for estimating quantiles
+            payloadConf.set(PigProcessor.SORT_VERTEX, tezOp.getSortOperator().getOperatorKey().toString());
+            // PIG-4162: Order by/Skew Join in intermediate stage.
+            // Increasing order by parallelism may not be required as it is
+            // usually followed by limit other than store. But would benefit
+            // cases like skewed join followed by group by.
+            if (tezOp.getSortOperator().getEstimatedParallelism() != -1
+                    && TezCompilerUtil.isIntermediateReducer(tezOp.getSortOperator())) {
+                payloadConf.setLong(
+                        InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+                        intermediateTaskInputSize);
+            }
+
         }
 
         payloadConf.set("pig.inputs", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp()));
@@ -443,8 +493,7 @@ public class TezDagBuilder extends TezOp
             tezOp.plan.remove(pack);
             payloadConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
             setIntermediateOutputKeyValue(keyType, payloadConf, tezOp);
-            POShuffleTezLoad newPack;
-            newPack = new POShuffleTezLoad(pack);
+            POShuffleTezLoad newPack = new POShuffleTezLoad(pack);
             if (tezOp.isSkewedJoin()) {
                 newPack.setSkewedJoins(true);
             }
@@ -455,7 +504,7 @@ public class TezDagBuilder extends TezOp
             // backend.
             Map<Integer, String> localRearrangeMap = new TreeMap<Integer, String>();
             for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
-                if (tezOp.sampleOperator != null && tezOp.sampleOperator == pred) {
+                if (tezOp.getSampleOperator() != null && tezOp.getSampleOperator() == pred) {
                     // skip sample vertex input
                 } else {
                     String inputKey = pred.getOperatorKey().toString();
@@ -511,7 +560,7 @@ public class TezDagBuilder extends TezOp
                 }
             }
         }
-        JobControlCompiler.setOutputFormat(job);
+        setOutputFormat(job);
 
         // set parent plan in all operators. currently the parent plan is really
         // used only when POStream, POSplit are present in the plan
@@ -546,19 +595,22 @@ public class TezDagBuilder extends TezOp
         }
 
         // set various parallelism into the job conf for later analysis, PIG-2779
-        payloadConf.setInt("pig.info.reducers.default.parallel", pc.defaultParallel);
-        payloadConf.setInt("pig.info.reducers.requested.parallel", tezOp.getRequestedParallelism());
-        payloadConf.setInt("pig.info.reducers.estimated.parallel", tezOp.getEstimatedParallelism());
+        payloadConf.setInt(PigImplConstants.REDUCER_DEFAULT_PARALLELISM, pc.defaultParallel);
+        payloadConf.setInt(PigImplConstants.REDUCER_REQUESTED_PARALLELISM, tezOp.getRequestedParallelism());
+        payloadConf.setInt(PigImplConstants.REDUCER_ESTIMATED_PARALLELISM, tezOp.getEstimatedParallelism());
+
+        TezScriptState ss = TezScriptState.get();
+        ss.addVertexSettingsToConf(dag.getName(), tezOp, payloadConf);
 
         // Take our assembled configuration and create a vertex
         UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
-        procDesc.setUserPayload(userPayload);
+        procDesc.setUserPayload(userPayload).setHistoryText(convertToHistoryText(tezOp.getOperatorKey().toString(), payloadConf));
 
         Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(), procDesc, tezOp.getVertexParallelism(),
-                isMap ? MRHelpers.getResourceForMRMapper(globalConf) : MRHelpers.getResourceForMRReducer(globalConf));
+                tezOp.isUseMRMapSettings() ? MRHelpers.getResourceForMRMapper(globalConf) : MRHelpers.getResourceForMRReducer(globalConf));
 
         Map<String, String> taskEnv = new HashMap<String, String>();
-        MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, taskEnv, isMap);
+        MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, taskEnv, tezOp.isUseMRMapSettings());
         vertex.setTaskEnvironment(taskEnv);
 
         // All these classes are @InterfaceAudience.Private in Hadoop. Switch to Tez methods in TEZ-1012
@@ -571,7 +623,7 @@ public class TezDagBuilder extends TezOp
         MRApps.setupDistributedCache(globalConf, localResources);
         vertex.addTaskLocalFiles(localResources);
 
-        vertex.setTaskLaunchCmdOpts(isMap ? MRHelpers.getJavaOptsForMRMapper(globalConf)
+        vertex.setTaskLaunchCmdOpts(tezOp.isUseMRMapSettings() ? MRHelpers.getJavaOptsForMRMapper(globalConf)
                 : MRHelpers.getJavaOptsForMRReducer(globalConf));
 
         log.info("For vertex - " + tezOp.getOperatorKey().toString()
@@ -591,7 +643,8 @@ public class TezDagBuilder extends TezOp
                     DataSourceDescriptor.create(InputDescriptor.create(MRInput.class.getName())
                           .setUserPayload(UserPayload.create(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
                           .setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf))
-                          .setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteString().asReadOnlyByteBuffer())),
+                          .setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteString().asReadOnlyByteBuffer()))
+                          .setHistoryText(convertToHistoryText("", payloadConf)),
                     InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()), dag.getCredentials()));
         }
 
@@ -609,7 +662,8 @@ public class TezDagBuilder extends TezOp
 
             OutputDescriptor storeOutDescriptor = OutputDescriptor.create(
                     MROutput.class.getName()).setUserPayload(TezUtils
-                    .createUserPayloadFromConf(outputPayLoad));
+                    .createUserPayloadFromConf(outputPayLoad))
+                    .setHistoryText(convertToHistoryText("", outputPayLoad));
             if (tezOp.getVertexGroupStores() != null) {
                 OperatorKey vertexGroupKey = tezOp.getVertexGroupStores().get(store.getOperatorKey());
                 if (vertexGroupKey != null) {
@@ -632,14 +686,16 @@ public class TezDagBuilder extends TezOp
             new PigOutputFormat().checkOutputSpecs(job);
         }
 
+        String vmPluginName = null;
+        Configuration vmPluginConf = null;
+
         // Set the right VertexManagerPlugin
         if (tezOp.getEstimatedParallelism() != -1) {
             if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
                 // Set VertexManagerPlugin to PartitionerDefinedVertexManager, which is able
                 // to decrease/increase parallelism of sorting vertex dynamically
                 // based on the numQuantiles calculated by sample aggregation vertex
-                vertex.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
-                        PartitionerDefinedVertexManager.class.getName()));
+                vmPluginName = PartitionerDefinedVertexManager.class.getName();
                 log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
             } else {
                 boolean containScatterGather = false;
@@ -655,24 +711,50 @@ public class TezDagBuilder extends TezOp
                 if (containScatterGather && !containCustomPartitioner) {
                     // Use auto-parallelism feature of ShuffleVertexManager to dynamically
                     // reduce the parallelism of the vertex
-                    VertexManagerPluginDescriptor vmPluginDescriptor = VertexManagerPluginDescriptor.create(
-                            ShuffleVertexManager.class.getName());
-                    Configuration vmPluginConf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
+                    vmPluginName = ShuffleVertexManager.class.getName();
+                    vmPluginConf = (vmPluginConf == null) ? ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
                     vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
-                    if (vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
-                            InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)!=
+                    if (stores.size() <= 0) {
+                        // Intermediate reduce. Set the bytes per reducer to be block size.
+                        vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+                                        intermediateTaskInputSize);
+                    } else if (vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+                                    InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) !=
                                     InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) {
                         vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
                                 vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                                         InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
                     }
-                    vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf));
-                    vertex.setVertexManagerPlugin(vmPluginDescriptor);
                     log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString());
                 }
             }
         }
-
+        if (tezOp.isLimit() && (vmPluginName == null || vmPluginName.equals(ShuffleVertexManager.class.getName()))) {
+            if (tezOp.inEdges.values().iterator().next().inputClassName.equals(UnorderedKVInput.class.getName())) {
+                // Setting SRC_FRACTION to 0.00001 so that even if there are 100K source tasks,
+                // limit job starts when 1 source task finishes.
+                // If limit is part of a group by or join because their parallelism is 1,
+                // we should leave the configuration with the defaults.
+                vmPluginName = ShuffleVertexManager.class.getName();
+                vmPluginConf = (vmPluginConf == null) ? ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
+                vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, "0.00001");
+                vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, "0.00001");
+                log.info("Set " + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION + " to 0.00001 for limit vertex " + tezOp.getOperatorKey().toString());
+            }
+        }
+        // else if(tezOp.isLimitAfterSort())
+        // TODO: PIG-4049 If standalone Limit we need a new VertexManager or new input
+        // instead of ShuffledMergedInput. For limit part of the sort (order by parallel 1) itself
+        // need to enhance PartitionerDefinedVertexManager
+
+        if (vmPluginName != null) {
+            VertexManagerPluginDescriptor vmPluginDescriptor = VertexManagerPluginDescriptor.create(vmPluginName);
+            if (vmPluginConf != null) {
+                vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf))
+                    .setHistoryText(convertToHistoryText(vmPluginName, vmPluginConf));
+            }
+            vertex.setVertexManagerPlugin(vmPluginDescriptor);
+        }
         // Reset udfcontext jobconf. It is not supposed to be set in the front end
         UDFContext.getUDFContext().addJobConf(null);
         return vertex;
@@ -943,4 +1025,48 @@ public class TezDagBuilder extends TezOp
         conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS,
                 comparatorClass);
     }
+
+    private void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
+        // the OutputFormat we report to Hadoop is always PigOutputFormat which
+        // can be wrapped with LazyOutputFormat provided if it is supported by
+        // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
+        if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
+            try {
+                Class<?> clazz = PigContext
+                        .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
+                Method method = clazz.getMethod("setOutputFormatClass",
+                        org.apache.hadoop.mapreduce.Job.class, Class.class);
+                method.invoke(null, job, PigOutputFormatTez.class);
+            } catch (Exception e) {
+                job.setOutputFormatClass(PigOutputFormatTez.class);
+                log.warn(PigConfiguration.PIG_OUTPUT_LAZY
+                        + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
+            }
+        } else {
+            job.setOutputFormatClass(PigOutputFormatTez.class);
+        }
+    }
+
+    // Borrowed from TezUtils.convertToHistoryText since it is not part of Tez 0.5.2
+    public static String convertToHistoryText(String description, Configuration conf) throws IOException {
+        // Add a version if this serialization is changed
+        JSONObject jsonObject = new JSONObject();
+        try {
+            if (description != null && !description.isEmpty()) {
+                jsonObject.put("desc", description);
+        }
+        if (conf != null) {
+            JSONObject confJson = new JSONObject();
+            Iterator<Entry<String, String>> iter = conf.iterator();
+            while (iter.hasNext()) {
+                Entry<String, String> entry = iter.next();
+                confJson.put(entry.getKey(), entry.getValue());
+            }
+            jsonObject.put("config", confJson);
+        }
+        } catch (JSONException e) {
+            throw new IOException("Error when trying to convert description/conf to JSON", e);
+        }
+        return jsonObject.toString();
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java Thu Nov 27 12:49:54 2014
@@ -24,8 +24,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
 import org.apache.pig.tools.pigstats.tez.TezScriptState;
-import org.apache.pig.tools.pigstats.tez.TezStats;
 
 public class TezExecutionEngine extends HExecutionEngine {
 
@@ -43,6 +43,6 @@ public class TezExecutionEngine extends 
 
     @Override
     public PigStats instantiatePigStats() {
-        return new TezStats(pigContext);
+        return new TezPigScriptStats(pigContext);
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Thu Nov 27 12:49:54 2014
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -32,14 +31,13 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.PigConfiguration;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
 import org.apache.tez.client.TezClient;
-import org.apache.tez.common.counters.CounterGroup;
-import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.Progress;
@@ -64,19 +62,39 @@ public class TezJob implements Runnable 
     private TezClient tezClient;
     private boolean reuseSession;
     private TezCounters dagCounters;
-    // Vertex, CounterGroup, Counter, Value
-    private Map<String, Map<String, Map<String, Long>>> vertexCounters;
+
     // Timer for DAG status reporter
     private Timer timer;
+    private TezJobConfig tezJobConf;
+    private TezPigScriptStats pigStats;
 
-    public TezJob(TezConfiguration conf, DAG dag, Map<String, LocalResource> requestAMResources)
-            throws IOException {
+    public TezJob(TezConfiguration conf, DAG dag,
+            Map<String, LocalResource> requestAMResources,
+            int estimatedTotalParallelism) throws IOException {
         this.conf = conf;
         this.dag = dag;
         this.requestAMResources = requestAMResources;
-        this.reuseSession = conf.getBoolean(PigConfiguration.TEZ_SESSION_REUSE, true);
+        this.reuseSession = conf.getBoolean(PigConfiguration.PIG_TEZ_SESSION_REUSE, true);
         this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
-        this.vertexCounters = Maps.newHashMap();
+        tezJobConf = new TezJobConfig(estimatedTotalParallelism);
+    }
+
+    static class TezJobConfig {
+
+        private int estimatedTotalParallelism = -1;
+
+        public TezJobConfig(int estimatedTotalParallelism) {
+            this.estimatedTotalParallelism = estimatedTotalParallelism;
+        }
+
+        public int getEstimatedTotalParallelism() {
+            return estimatedTotalParallelism;
+        }
+
+        public void setEstimatedTotalParallelism(int estimatedTotalParallelism) {
+            this.estimatedTotalParallelism = estimatedTotalParallelism;
+        }
+
     }
 
     public DAG getDAG() {
@@ -84,7 +102,7 @@ public class TezJob implements Runnable 
     }
 
     public String getName() {
-        return dag == null ? "" : dag.getName();
+        return dag.getName();
     }
 
     public Configuration getConfiguration() {
@@ -103,14 +121,6 @@ public class TezJob implements Runnable 
         return dagCounters;
     }
 
-    public Map<String, Map<String, Long>> getVertexCounters(String group) {
-        return vertexCounters.get(group);
-    }
-
-    public Map<String, Long> getVertexCounters(String group, String name) {
-        return vertexCounters.get(group).get(name);
-    }
-
     public float getDAGProgress() {
         Progress p = dagStatus.getDAGProgress();
         return p == null ? 0 : (float)p.getSucceededTaskCount() / (float)p.getTotalTaskCount();
@@ -126,10 +136,28 @@ public class TezJob implements Runnable 
         return vertexProgress;
     }
 
+    public VertexStatus getVertexStatus(String vertexName) {
+        VertexStatus vs = null;
+        try {
+            vs = dagClient.getVertexStatus(vertexName, statusGetOpts);
+        } catch (Exception e) {
+            // Don't fail the job even if vertex status couldn't
+            // be retrieved.
+            log.warn("Cannot retrieve status for vertex " + vertexName, e);
+        }
+        return vs;
+    }
+
+    public void setPigStats(TezPigScriptStats pigStats) {
+        this.pigStats = pigStats;
+    }
+
     @Override
     public void run() {
+        UDFContext udfContext = UDFContext.getUDFContext();
         try {
-            tezClient = TezSessionManager.getClient(conf, requestAMResources, dag.getCredentials());
+            tezClient = TezSessionManager.getClient(conf, requestAMResources,
+                    dag.getCredentials(), tezJobConf);
             log.info("Submitting DAG " + dag.getName());
             dagClient = tezClient.submitDAG(dag);
             appId = tezClient.getAppMasterApplicationId();
@@ -145,7 +173,7 @@ public class TezJob implements Runnable 
 
         timer = new Timer();
         timer.schedule(new DAGStatusReporter(), 1000, conf.getLong(
-                PigConfiguration.TEZ_DAG_STATUS_REPORT_INTERVAL, 10) * 1000);
+                PigConfiguration.PIG_TEZ_DAG_STATUS_REPORT_INTERVAL, 20) * 1000);
 
         while (true) {
             try {
@@ -156,10 +184,18 @@ public class TezJob implements Runnable 
             }
 
             if (dagStatus.isCompleted()) {
+                // For tez_local mode where PigProcessor destroys all UDFContext
+                UDFContext.setUdfContext(udfContext);
+
+                log.info("DAG Status: " + dagStatus);
                 dagCounters = dagStatus.getDAGCounters();
-                collectVertexCounters();
                 TezSessionManager.freeSession(tezClient);
                 try {
+                    pigStats.accumulateStats(this);
+                } catch (Exception e) {
+                    log.warn("Exception while gathering stats", e);
+                }
+                try {
                     if (!reuseSession) {
                         TezSessionManager.stopSession(tezClient);
                     }
@@ -182,36 +218,17 @@ public class TezJob implements Runnable 
     }
 
     private class DAGStatusReporter extends TimerTask {
+
+        private final String LINE_SEPARATOR = System.getProperty("line.separator");
+
         @Override
         public void run() {
-            log.info("DAG Status: " + dagStatus);
-        }
-    }
-
-    private void collectVertexCounters() {
-        for (Vertex v : dag.getVertices()) {
-            String name = v.getName();
-            try {
-                VertexStatus s = dagClient.getVertexStatus(name, statusGetOpts);
-                TezCounters counters = s.getVertexCounters();
-                Map<String, Map<String, Long>> grpCounters = Maps.newHashMap();
-                Iterator<CounterGroup> grpIt = counters.iterator();
-                while (grpIt.hasNext()) {
-                    CounterGroup grp = grpIt.next();
-                    Iterator<TezCounter> cntIt = grp.iterator();
-                    Map<String, Long> cntMap = Maps.newHashMap();
-                    while (cntIt.hasNext()) {
-                        TezCounter cnt = cntIt.next();
-                        cntMap.put(cnt.getName(), cnt.getValue());
-                    }
-                    grpCounters.put(grp.getName(), cntMap);
-                }
-                vertexCounters.put(name, grpCounters);
-            } catch (Exception e) {
-                // Don't fail the job even if vertex counters couldn't
-                // be retrieved.
-                log.info("Cannot retrieve counters for vertex " + name, e);
-            }
+            if (dagStatus == null) return;
+            String msg = "status=" + dagStatus.getState()
+              + ", progress=" + dagStatus.getDAGProgress()
+              + ", diagnostics="
+              + StringUtils.join(dagStatus.getDiagnostics(), LINE_SEPARATOR);
+            log.info("DAG Status: " + msg);
         }
     }
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Thu Nov 27 12:49:54 2014
@@ -32,6 +32,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
 import org.apache.pig.impl.PigContext;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -43,7 +46,6 @@ import org.apache.tez.dag.api.TezConfigu
  */
 public class TezJobCompiler {
     private static final Log log = LogFactory.getLog(TezJobCompiler.class);
-    private static int dagIdentifier = 0;
 
     private PigContext pigContext;
     private TezConfiguration tezConf;
@@ -53,24 +55,22 @@ public class TezJobCompiler {
         this.tezConf = new TezConfiguration(conf);
     }
 
-    public DAG buildDAG(TezOperPlan tezPlan, Map<String, LocalResource> localResources)
+    public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String, LocalResource> localResources)
             throws IOException, YarnException {
-        String jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME, "pig");
-        DAG tezDag = DAG.create(jobName + "-" + dagIdentifier);
-        dagIdentifier++;
+        DAG tezDag = DAG.create(tezPlanNode.getOperatorKey().toString());
         tezDag.setCredentials(new Credentials());
-        TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlan, tezDag, localResources);
+        TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlanNode.getTezOperPlan(), tezDag, localResources);
         dagBuilder.visit();
         return tezDag;
     }
 
-    public TezJob compile(TezOperPlan tezPlan, String grpName, TezPlanContainer planContainer)
+    public TezJob compile(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer)
             throws JobCreationException {
         TezJob job = null;
         try {
             // A single Tez job always pack only 1 Tez plan. We will track
             // Tez job asynchronously to exploit parallel execution opportunities.
-            job = getJob(tezPlan, planContainer);
+            job = getJob(tezPlanNode, planContainer);
         } catch (JobCreationException jce) {
             throw jce;
         } catch(Exception e) {
@@ -82,11 +82,12 @@ public class TezJobCompiler {
         return job;
     }
 
-    private TezJob getJob(TezOperPlan tezPlan, TezPlanContainer planContainer)
+    private TezJob getJob(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer)
             throws JobCreationException {
         try {
             Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
             localResources.putAll(planContainer.getLocalResources());
+            TezOperPlan tezPlan = tezPlanNode.getTezOperPlan();
             localResources.putAll(tezPlan.getExtraResources());
             String shipFiles = pigContext.getProperties().getProperty("pig.streaming.ship.files");
             if (shipFiles != null) {
@@ -101,8 +102,11 @@ public class TezJobCompiler {
                     TezResourceManager.getInstance().addTezResource(new Path(new URI(file.trim())).toUri());
                 }
             }
-            DAG tezDag = buildDAG(tezPlan, localResources);
-            return new TezJob(tezConf, tezDag, localResources);
+            for (Map.Entry<String, LocalResource> entry : localResources.entrySet()) {
+                log.info("Local resource: " + entry.getKey());
+            }
+            DAG tezDag = buildDAG(tezPlanNode, localResources);
+            return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism());
         } catch (Exception e) {
             int errCode = 2017;
             String msg = "Internal error creating job configuration.";