You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2012/08/30 07:10:35 UTC

svn commit: r1378801 [1/2] - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/builtin/ src/or...

Author: dvryaboy
Date: Thu Aug 30 05:10:34 2012
New Revision: 1378801

URL: http://svn.apache.org/viewvc?rev=1378801&view=rev
Log:
PIG-2888: Improve performance of POPartialAgg

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/build.xml
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
    pig/trunk/src/org/apache/pig/builtin/Distinct.java
    pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
    pig/trunk/src/org/apache/pig/data/SelfSpillBag.java
    pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/PigPerformanceLoader.java
    pig/trunk/test/org/apache/pig/test/TestPOCast.java
    pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java
    pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Aug 30 05:10:34 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2888: Improve performance of POPartialAgg (dvryaboy)
+
 PIG-2708: split MiniCluster based tests out of org.apache.pig.test.TestInputOutputFileValidator (analog.sony via daijy)
 
 PIG-2890: Revert PIG-2578 (dvryaboy)

Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Thu Aug 30 05:10:34 2012
@@ -298,6 +298,8 @@
     </path> 
 
     <path id="test.classpath">
+        <!-- need to put this first, otherwise junit-3 testcases can break -->
+        <pathelement location="${ivy.lib.dir}/junit-3.8.1.jar"/>
         <pathelement location="${build.classes}"/>
         <pathelement location="${test.src.dir}"/>
         <pathelement location="contrib/piggybank/java/piggybank.jar"/>
@@ -783,6 +785,20 @@
     </target>
 
     <!-- ================================================================== -->
+    <!-- Make pigperf.jar                                                   -->
+    <!-- ================================================================== -->
+    <target name="pigperf" depends="compile-test" description="Create pigperf.jar">
+        <jar jarfile="pigperf.jar">
+            <fileset dir="${test.build.dir}/classes">
+                <include name="org/apache/pig/test/pigmix/**"/>
+                <include name="org/apache/pig/test/utils/datagen/*"/>
+                <include name="org/apache/pig/test/udf/storefunc/*"/>
+            </fileset>
+            <zipfileset src="${lib.dir}/sdsuLibJKD12.jar" />
+        </jar>
+    </target>
+
+    <!-- ================================================================== -->
     <!-- Run unit tests                                                     -->
     <!-- ================================================================== -->
     <target name="test-core" depends="compile-test,jar-withouthadoop,debugger.check" description="Run full set of unit tests">

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Thu Aug 30 05:10:34 2012
@@ -24,8 +24,31 @@ package org.apache.pig;
 public class PigConfiguration {
 
     /**
+     * Controls the fraction of total memory that is allowed to be used by
+     * cached bags. Default is 0.2.
+     */
+    public static final String PROP_CACHEDBAG_MEMUSAGE = "pig.cachedbag.memusage";
+
+    /**
+     * Controls whether partial aggregation is turned on
+     */
+    public static final String PROP_EXEC_MAP_PARTAGG = "pig.exec.mapPartAgg";
+
+    /**
+     * Controls the minimum reduction in-mapper Partial Aggregation should achieve in order
+     * to stay on. If after a period of observation this reduction is not achieved,
+     * in-mapper aggregation will be turned off and a message logged to that effect.
+     */
+    public static final String PARTAGG_MINREDUCTION = "pig.exec.mapPartAgg.minReduction";
+
+    /**
      * Controls whether execution time of Pig UDFs should be tracked.
      * This feature uses counters; use judiciously.
      */
     public static final String TIME_UDFS_PROP = "pig.udf.profile";
+
+    /**
+     * Turns off use of combiners in MapReduce jobs produced by Pig.
+     */
+    public static final String PROP_NO_COMBINER = "pig.exec.nocombiner";
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Aug 30 05:10:34 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapred.RunningJ
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
 import org.apache.pig.PigRunner.ReturnCode;
@@ -80,9 +81,6 @@ public class MapReduceLauncher extends L
     public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
         "mapreduce.fileoutputcommitter.marksuccessfuljobs";
 
-    public static final String PROP_EXEC_MAP_PARTAGG = "pig.exec.mapPartAgg";
-
-    
     private static final Log log = LogFactory.getLog(MapReduceLauncher.class);
  
     //used to track the exception thrown by the job control which is run in a separate thread
@@ -561,10 +559,10 @@ public class MapReduceLauncher extends L
             pc.getProperties().getProperty(
                     "last.input.chunksize", POJoinPackage.DEFAULT_CHUNK_SIZE);
         
-        String prop = pc.getProperties().getProperty("pig.exec.nocombiner");
+        String prop = pc.getProperties().getProperty(PigConfiguration.PROP_NO_COMBINER);
         if (!pc.inIllustrator && !("true".equals(prop)))  {
             boolean doMapAgg = 
-                    Boolean.valueOf(pc.getProperties().getProperty(PROP_EXEC_MAP_PARTAGG,"false"));
+                    Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG,"false"));
             CombinerOptimizer co = new CombinerOptimizer(plan, doMapAgg);
             co.visit();
             //display the warning message(s) from the CombinerOptimizer

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Thu Aug 30 05:10:34 2012
@@ -18,13 +18,15 @@
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.WeakHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -34,180 +36,148 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.DefaultDataBag;
 import org.apache.pig.data.SelfSpillBag.MemoryLimits;
-import org.apache.pig.data.SizeUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
+import com.google.common.collect.Maps;
 
 /**
- * Do partial aggregation in map plan. It uses a hash-map to aggregate. If
- * consecutive records have same key, it will aggregate those without adding
- * them to the hash-map. As future optimization, the use of hash-map could be
- * disabled when input data is sorted on group-by keys
+ * Do partial aggregation in map plan. Inputs are buffered up in
+ * a hashmap until a threshold is reached; then the combiner functions
+ * are fed these buffered up inputs, and results stored in a secondary
+ * map. Once that map fills up or all input has been seen, results are
+ * piped out into the next operator (caller of getNext()).
  */
 public class POPartialAgg extends PhysicalOperator {
-
-    public static final String PROP_PARTAGG_MINREDUCTION = "pig.exec.mapPartAgg.minReduction";
-
-    private static final Log log = LogFactory.getLog(POPartialAgg.class);
+    private static final Log LOG = LogFactory.getLog(POPartialAgg.class);
     private static final long serialVersionUID = 1L;
 
-    private PhysicalPlan keyPlan;
-    private ExpressionOperator keyLeaf;
-
-    private List<PhysicalPlan> valuePlans;
-    private List<ExpressionOperator> valueLeaves;
     private static final Result ERR_RESULT = new Result();
     private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP,
             null);
 
-    // run time variables
-    private transient Object currentKey = null;
-    private transient Map<Object, Tuple> aggMap;
-    // tuple of the format - (null(key),bag-val1,bag-val2,...)
-    // attach this to the plans with algebraic udf before evaluating the plans
-    private transient Tuple valueTuple = null;
+    // number of records to sample to determine average size used by each
+    // entry in hash map and average seen reduction
+    private static final int NUM_RECS_TO_SAMPLE = 10000;
+
+    // We want to avoid massive ArrayList copies as they get big.
+    // Array Lists grow by prevSize + prevSize/2. Given default initial size of 10,
+    // 9369 is the size of the array after 18 such resizings. This seems like a sufficiently
+    // large value to trigger spilling/aggregation instead of paying for yet another data
+    // copy.
+    private static final int MAX_LIST_SIZE = 9368;
 
-    private boolean isFinished = false;
+    private static final int DEFAULT_MIN_REDUCTION = 10;
 
-    private transient Iterator<Tuple> mapDumpIterator;
-    private transient int numToDump;
+    // TODO: these are temporary. The real thing should be using memory usage estimation.
+    private static final int FIRST_TIER_THRESHOLD = 20000;
+    private static final int SECOND_TIER_THRESHOLD = FIRST_TIER_THRESHOLD / DEFAULT_MIN_REDUCTION;
 
-    // maximum bag size of currentValues cached before aggregation is done
-    private static final int MAX_SIZE_CURVAL_CACHE = 1024;
+    private static final WeakHashMap<POPartialAgg, Byte> ALL_POPARTS = new WeakHashMap<POPartialAgg, Byte>();
 
-    // number of records to sample to determine average size used by each
-    // entry in hash map
-    private static final int NUM_RESRECS_TO_SAMPLE_SZ_ESTIMATE = 100;
+    private static final TupleFactory TF = TupleFactory.getInstance();
+    private static final BagFactory BG = BagFactory.getInstance();
 
-    // params for auto disabling map aggregation
-    private static final int NUM_INPRECS_TO_SAMPLE_SZ_REDUCTION = 1000;
+    private PhysicalPlan keyPlan;
+    private ExpressionOperator keyLeaf;
 
-    private static final int DEFAULT_MIN_REDUCTION = 10;
+    private List<PhysicalPlan> valuePlans;
+    private List<ExpressionOperator> valueLeaves;
+
+    private int numRecsInRawMap = 0;
+    private int numRecsInProcessedMap = 0;
+
+    private Map<Object, List<Tuple>> rawInputMap = Maps.newHashMap();
+    private Map<Object, List<Tuple>> processedInputMap = Maps.newHashMap();
 
     private boolean disableMapAgg = false;
-    private int num_inp_recs;
     private boolean sizeReductionChecked = false;
+    private boolean inputsExhausted = false;
+    private boolean doSpill = false;
+    private transient MemoryLimits memLimits;
 
-    private transient int maxHashMapSize;
+    private transient boolean initialized = false;
+    private int firstTierThreshold = FIRST_TIER_THRESHOLD;
+    private int secondTierThreshold = SECOND_TIER_THRESHOLD;
+    private int sizeReduction;
+    private Iterator<Entry<Object, List<Tuple>>> spillingIterator;
+    private boolean estimatedMemThresholds;
 
-    private transient TupleFactory tupleFact;
-    private transient MemoryLimits memLimits;
 
     public POPartialAgg(OperatorKey k) {
         super(k);
     }
 
-    @Override
-    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
-        // combiner optimizer does not get invoked if the plan is being executed
-        // under illustrate, so POPartialAgg should not get used in that case
-        throw new UnsupportedOperationException();
+    private void init() throws ExecException {
+        ALL_POPARTS.put(this, null);
+        float percent = getPercentUsageFromProp();
+        if (percent <= 0) {
+            LOG.info("No memory allocated to intermediate memory buffers. Turning off partial aggregation.");
+            disableMapAgg();
     }
-
-    @Override
-    public void visit(PhyPlanVisitor v) throws VisitorException {
-        v.visitPartialAgg(this);
+        initialized = true;
     }
 
     @Override
-    public Result getNext(Tuple t) throws ExecException {
+    public Result getNext(Tuple __ignored__) throws ExecException {
+        // accumulate tuples from processInput in rawInputMap.
+        // when the maps grow to mem limit, go over each item in map, and call
+        // combiner aggs on each collection.
+        // Store the results into processedInputMap. Clear out rawInputMap.
+        // Mem usage is updated every time we modify either of the maps.
+        // When processedInputMap is >= 20% of allotted memory, run aggs on it,
+        // and output the results as returns of successive calls of this method.
+        // Then reset processedInputMap.
+        // The fact that we are in the latter stage is communicated via the doSpill
+        // flag.
 
-        if (disableMapAgg) {
-            // map aggregation has been automatically disabled
-            if (mapDumpIterator != null) {
-                // there are some accumulated entries in map to be dumped
-                return getNextResFromMap();
-            } else {
-                Result inp = processInput();
-                if (disableMapAgg) {
-                    // the in-map partial aggregation is an optional step, just
-                    // like the combiner.
-                    // act as if this operator was never there, by just 
-                    // returning the input
-                    return inp;
+        if (!initialized && !ALL_POPARTS.containsKey(this)) {
+            init();
                 }
-            }
-        }
 
-        if (mapDumpIterator != null) {
-            // if this iterator is not null, we are process of dumping records
-            // from the map
-            if (isFinished) {
-                return getNextResFromMap();
-            } else if (numToDump > 0) {
-                // there are some tuples yet to be dumped, to free memory
-                --numToDump;
-                return getNextResFromMap();
-            } else {
-                mapDumpIterator = null;
+        while (true) {
+            if (!sizeReductionChecked && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
+                checkSizeReduction();
             }
+            if (!estimatedMemThresholds && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
+                estimateMemThresholds();
         }
-
-        if (isFinished) {
-            // done with dumping all records
-            return new Result(POStatus.STATUS_EOP, null);
-        }
-
-        while (true) {
-            //process each input until EOP
+            if (doSpill) {
+                Result result = spillResult();
+                if (result == EOP_RESULT) {
+                    doSpill = false;
+            }
+                if (result != EOP_RESULT || inputsExhausted) {
+                    return result;
+        }
+        }
+            if (mapAggDisabled()) {
+                // disableMapAgg() sets doSpill, so we can't get here while there is still contents in the buffered maps.
+                // if we get to this point, everything is flushed, so we can simply return the raw tuples from now on.
+                return processInput();
+            } else {
             Result inp = processInput();
             if (inp.returnStatus == POStatus.STATUS_ERR) {
-                // error
                 return inp;
-            }
-            if (inp.returnStatus == POStatus.STATUS_EOP) {
+                } else if (inp.returnStatus == POStatus.STATUS_EOP) {
                 if (parentPlan.endOfAllInput) {
-                    // it is actually end of all input
-                    // start dumping results
-                    isFinished = true;
-                    logCapacityOfAggMap();
-                    // check if there was ANY input
-                    if (valueTuple == null) {
-                        return EOP_RESULT;
-                    }
-
-                    // first return agg for currentKey
-                    Result output = getOutput();
-                    aggMap.remove(currentKey);
-
-                    mapDumpIterator = aggMap.values().iterator();
-
-                    // free the variables not needed anymore
-                    currentKey = null;
-                    valueTuple = null;
-
-                    return output;
+                        // parent input is over. flush what we have.
+                        inputsExhausted = true;
+                        startSpill();
+                        LOG.info("Spilling last bits.");
+                        continue;
                 } else {
-                    // return EOP
-                    return inp;
-                }
+                        return EOP_RESULT;
             }
-            if (inp.returnStatus == POStatus.STATUS_NULL) {
+                } else if (inp.returnStatus == POStatus.STATUS_NULL) {
                 continue;
-            }
-
-            // check if this operator is doing a good job of reducing the number
-            // of records going to output to justify the costs of itself
-            // if not , disable map partial agg
-            if ((!sizeReductionChecked)) {
-                checkSizeReduction();
-
-                if (disableMapAgg) {
-                    // in-map partial aggregation just got disabled
-                    // return the new input record, it has not been aggregated
-                    return inp;
-                }
-            }
-
-            // we have some real input data
-
-            // setup input for keyplan
+                } else {
+                    // add this input to map.
             Tuple inpTuple = (Tuple) inp.result;
             keyPlan.attachInput(inpTuple);
 
@@ -218,134 +188,208 @@ public class POPartialAgg extends Physic
             }
             Object key = keyRes.result;
             keyPlan.detachInput();
+                    numRecsInRawMap += 1;
+                    addKeyValToMap(rawInputMap, key, inpTuple);
 
-            if (valueTuple == null) {
-                // this is the first record the operator is seeing
-                // do initializations
-                init(key, inpTuple);
-                continue;
-            } else {
-                // check if key changed
-                boolean keyChanged = (currentKey != null && key == null)
-                        || ((key != null) && (!key.equals(currentKey)));
-
-                if (!keyChanged) {
-                    addToCurrentValues(inpTuple);
-
-                    // if there are enough number of values,
-                    // aggregate the values accumulated in valueTuple
-                    if (((DefaultDataBag) valueTuple.get(1)).size() >= MAX_SIZE_CURVAL_CACHE) {
-                        // not a key change, so store the agg result back to bag
-                        aggregateCurrentValues();
+                    if (shouldAggregateFirstLevel()) {
+                        aggregateFirstLevel();
+                    }
+                    if (shouldAggregateSecondLevel()) {
+                        aggregateSecondLevel();
                     }
-                    continue;
-                } else {// new key
+                    if (shouldSpill()) {
+                        LOG.info("Starting spill.");
+                        startSpill(); // next time around, we'll start emitting.
+                    }
+                }
+            }
+        }
+    }
 
-                    // compute aggregate for currentKey
-                    Result output = getOutput();
-                    if (output.returnStatus != POStatus.STATUS_OK) {
-                        return ERR_RESULT;
+    private void estimateMemThresholds() {
+        if (!mapAggDisabled()) {
+            LOG.info("Getting mem limits; considering " + ALL_POPARTS.size() + " POPArtialAgg objects.");
+
+            float percent = getPercentUsageFromProp();
+            memLimits = new MemoryLimits(ALL_POPARTS.size(), percent);
+            int estTotalMem = 0;
+            int estTuples = 0;
+            for (Map.Entry<Object, List<Tuple>> entry : rawInputMap.entrySet()) {
+                for (Tuple t : entry.getValue()) {
+                    estTuples += 1;
+                    int mem = (int) t.getMemorySize();
+                    estTotalMem += mem;
+                    memLimits.addNewObjSize(mem);
                     }
-                    
-                    // set new current key, value
-                    currentKey = key;
-                    resetCurrentValues();
-                    addToCurrentValues(inpTuple);
-
-                    // get existing result from map (if any) and add it to
-                    // current values
-                    Tuple existingResult = aggMap.get(key);
-
-                    // existingResult will be null only if key is absent in
-                    // aggMap
-                    if (existingResult != null) {
-                        addToCurrentValues(existingResult);
+            }
+            int totalTuples = memLimits.getCacheLimit();
+            LOG.info("Estimated total tuples to buffer, based on " + estTuples + " tuples that took up " + estTotalMem + " bytes: " + totalTuples);
+            firstTierThreshold = (int) (0.5 + totalTuples * (1f - (1f / sizeReduction)));
+            secondTierThreshold = (int) (0.5 + totalTuples *  (1f / sizeReduction));
+            LOG.info("Setting thresholds. Primary: " + firstTierThreshold + ". Secondary: " + secondTierThreshold);
+        }
+        estimatedMemThresholds = true;
                     }
+                    
+    private void checkSizeReduction() throws ExecException {
+        int numBeforeReduction = numRecsInProcessedMap + numRecsInRawMap;
+        aggregateFirstLevel();
+        aggregateSecondLevel();
+        int numAfterReduction = numRecsInProcessedMap + numRecsInRawMap;
+        LOG.info("After reduction, processed map: " + numRecsInProcessedMap + "; raw map: " + numRecsInRawMap);
+        int minReduction = getMinOutputReductionFromProp();
+        LOG.info("Observed reduction factor: from " + numBeforeReduction +
+                " to " + numAfterReduction +
+                " => " + numBeforeReduction / numAfterReduction + ".");
+        if ( numBeforeReduction / numAfterReduction < minReduction) {
+            LOG.info("Disabling in-memory aggregation, since observed reduction is less than " + minReduction);
+            disableMapAgg();
+        }
+        sizeReduction = numBeforeReduction / numAfterReduction;
+        sizeReductionChecked = true;
+
+    }
+    private void disableMapAgg() throws ExecException {
+        startSpill();
+        disableMapAgg = true;
+    }
 
-                    // storing a new entry in the map, so update estimate of
-                    // num of entries that will fit into the map
-                    if (memLimits.getNumObjectsSizeAdded() < NUM_RESRECS_TO_SAMPLE_SZ_ESTIMATE) {
-                        updateMaxMapSize(output.result);
+    private boolean mapAggDisabled() {
+        return disableMapAgg;
                     }
 
-                    // check if it is time to dump some aggs from the hashmap
-                    if (aggMap.size() >= maxHashMapSize) {
-                        // dump 10% of max hash size because dumping just one
-                        // record at a time might result in most group key being
-                        // dumped (depending on hashmap implementation)
-                        // TODO: dump the least recently/frequently used entries
-                        numToDump = maxHashMapSize / 10;
-                        mapDumpIterator = aggMap.values().iterator();
+    private boolean shouldAggregateFirstLevel() {
+        if (LOG.isInfoEnabled() && numRecsInRawMap > firstTierThreshold) {
+            LOG.info("Aggregating " + numRecsInRawMap + " raw records.");
+        }
+        return (numRecsInRawMap > firstTierThreshold);
+                    }
 
-                        return output;
+    private boolean shouldAggregateSecondLevel() {
+        if (LOG.isInfoEnabled() && numRecsInProcessedMap > secondTierThreshold) {
+            LOG.info("Aggregating " + numRecsInProcessedMap + " secondary records.");
+        }
+        return (numRecsInProcessedMap > secondTierThreshold);
+    }
+
+    private boolean shouldSpill() {
+        // is this always the same as shouldAgg?
+        return shouldAggregateSecondLevel();
+    }
+
+    private void addKeyValToMap(Map<Object, List<Tuple>> map,
+            Object key, Tuple inpTuple) throws ExecException {
+        List<Tuple> value = map.get(key);
+        if (value == null) {
+            value = new ArrayList<Tuple>();
+            map.put(key, value);
+        }
+       value.add(inpTuple);
+       if (value.size() >= MAX_LIST_SIZE) {
+           boolean isFirst = (map == rawInputMap);
+           if (LOG.isInfoEnabled()){
+               LOG.info("The cache for key " + key + " has grown too large. Aggregating " + ((isFirst) ? "first level." : "second level."));
+           }
+           if (isFirst) {
+               aggregateRawRow(key);
                     } else {
-                        // there is space available in the hashmap, store the
-                        // output there
-                        addOutputToAggMap(output);
+               aggregateSecondLevel();
+           }
+       }
                     }
 
-                    continue;
+    private void startSpill() throws ExecException {
+        if (!rawInputMap.isEmpty()) {
+            if (LOG.isInfoEnabled()) {
+                LOG.info("In startSpill(), aggregating raw inputs. " + numRecsInRawMap + " tuples.");
+            }
+            aggregateFirstLevel();
+            if (LOG.isInfoEnabled()) {
+                LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples.");
+            }
+        }
+        if (!processedInputMap.isEmpty()) {
+            if (LOG.isInfoEnabled()) {
+                LOG.info("In startSpill(), aggregating processed inputs. " + numRecsInProcessedMap + " tuples.");
+            }
+            aggregateSecondLevel();
+            if (LOG.isInfoEnabled()) {
+                LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples.");
+            }
                 }
+        doSpill = true;
+        spillingIterator = processedInputMap.entrySet().iterator();
             }
+
+    private Result spillResult() throws ExecException {
+        // if no more to spill, return EOP_RESULT.
+        if (processedInputMap.isEmpty()) {
+            LOG.info("In spillResults(), processed map is empty -- done spilling.");
+            return EOP_RESULT;
+        } else {
+            Map.Entry<Object, List<Tuple>> entry = spillingIterator.next();
+            Tuple valueTuple = createValueTuple(entry.getKey(), entry.getValue());
+            numRecsInProcessedMap -= entry.getValue().size();
+            spillingIterator.remove();
+            Result res = getOutput(entry.getKey(), valueTuple);
+            return res;
         }
     }
 
-    private void updateMaxMapSize(Object result) {
-        long size = SizeUtil.getMapEntrySize(currentKey,
-                result);
-        memLimits.addNewObjSize(size);
-        maxHashMapSize = memLimits.getCacheLimit();
+    private void aggregateRawRow(Object key) throws ExecException {
+        List<Tuple> value = rawInputMap.get(key);
+        Tuple valueTuple = createValueTuple(key, value);
+        Result res = getOutput(key, valueTuple);
+        rawInputMap.remove(key);
+        addKeyValToMap(processedInputMap, key, getAggResultTuple(res.result));
+        numRecsInProcessedMap += valueTuple.size() - 1;
     }
 
     /**
-     * Aggregate values accumulated in
-     * 
+     * For each entry in rawInputMap, feed the list of tuples into the aggregator funcs
+     * and add the results to processedInputMap. Remove the entries from rawInputMap as we go.
      * @throws ExecException
      */
-    private void aggregateCurrentValues() throws ExecException {
-        for (int i = 0; i < valuePlans.size(); i++) {
-            valuePlans.get(i).attachInput(valueTuple);
-            Result valRes = getResult(valueLeaves.get(i));
-            if (valRes == ERR_RESULT) {
-                throw new ExecException(
-                        "Error computing aggregate during in-map partial aggregation");
+    private int aggregate(Map<Object, List<Tuple>> fromMap, Map<Object, List<Tuple>> toMap, int numEntriesInTarget) throws ExecException {
+        Iterator<Map.Entry<Object, List<Tuple>>> iter = fromMap.entrySet().iterator();
+        while (iter.hasNext()) {
+            Map.Entry<Object, List<Tuple>> entry = iter.next();
+            Tuple valueTuple = createValueTuple(entry.getKey(), entry.getValue());
+            Result res = getOutput(entry.getKey(), valueTuple);
+            iter.remove();
+            addKeyValToMap(toMap, entry.getKey(), getAggResultTuple(res.result));
+            numEntriesInTarget += valueTuple.size() - 1;
             }
-
-            Tuple aggVal = getAggResultTuple(valRes.result);
-
-            // i'th plan should read only from i'th bag
-            // so we are done with i'th bag, clear it and
-            // add the new agg result to it
-            DataBag valBag = (DataBag) valueTuple.get(i + 1);
-            valBag.clear();
-            valBag.add(aggVal);
-
-            valuePlans.get(i).detachInput();
+        return numEntriesInTarget;
         }
+
+    private void aggregateFirstLevel() throws ExecException {
+        numRecsInProcessedMap = aggregate(rawInputMap, processedInputMap, numRecsInProcessedMap);
+        numRecsInRawMap = 0;
     }
 
-    private void init(Object key, Tuple inpTuple) throws ExecException {
-        tupleFact = TupleFactory.getInstance();
+    private void aggregateSecondLevel() throws ExecException {
+        Map<Object, List<Tuple>> newMap = Maps.newHashMapWithExpectedSize(processedInputMap.size());
+        numRecsInProcessedMap = aggregate(processedInputMap, newMap, 0);
+        processedInputMap = newMap;
+    }
 
-        // value tuple has bags of values for currentKey
-        valueTuple = tupleFact.newTuple(valuePlans.size() + 1);
+    private Tuple createValueTuple(Object key, List<Tuple> inpTuples) throws ExecException {
+        Tuple valueTuple = TF.newTuple(valuePlans.size() + 1);
+        valueTuple.set(0, key);
 
         for (int i = 0; i < valuePlans.size(); i++) {
-            valueTuple.set(i + 1, new DefaultDataBag(new ArrayList<Tuple>(
-                    MAX_SIZE_CURVAL_CACHE)));
+            DataBag bag = BG.newDefaultBag();
+            valueTuple.set(i + 1, bag);
+        }
+        for (Tuple t : inpTuples) {
+            for (int i = 1; i < t.size(); i++) {
+                DataBag bag = (DataBag) valueTuple.get(i);
+                bag.add((Tuple) t.get(i));
+            }
         }
 
-        // set current key, add value
-        currentKey = key;
-        addToCurrentValues(inpTuple);
-        aggMap = new HashMap<Object, Tuple>();
-
-        // TODO: keep track of actual number of objects that share the
-        // memory limit. For now using a default of 3, which is what is
-        // used by InternalCachedBag
-        memLimits = new MemoryLimits(3, -1);
-        maxHashMapSize = Integer.MAX_VALUE;
-
+        return valueTuple;
     }
 
     private Tuple getAggResultTuple(Object result) throws ExecException {
@@ -357,109 +401,43 @@ public class POPartialAgg extends Physic
         }
     }
 
-    private void checkSizeReduction() throws ExecException {
-
-        num_inp_recs++;
-        if (num_inp_recs == NUM_INPRECS_TO_SAMPLE_SZ_REDUCTION
-                || (aggMap != null && aggMap.size() == maxHashMapSize - 1)) {
-            // the above check for the hashmap current size is
-            // done to avoid having to keep track of any dumps that
-            // could
-            // happen before NUM_INPRECS_TO_SAMPLE_SZ_REDUCTION is
-            // reached
-
-            sizeReductionChecked = true;
-
-            // find out how many output records we have for this many
-            // input records
-
-            int outputReduction = aggMap.size() == 0 ? Integer.MAX_VALUE
-                    : num_inp_recs / aggMap.size();
-            int min_output_reduction = getMinOutputReductionFromProp();
-            if (outputReduction < min_output_reduction) {
-                disableMapAgg = true;
-                log.info("Disabling in-map partial aggregation because the "
-                        + "reduction in tuples (" + outputReduction
-                        + ") is lower than threshold (" + min_output_reduction
-                        + ")");
-                logCapacityOfAggMap();
-                // get current key vals output
-                Result output = getOutput();
-
-                // free the variables not needed anymore
-                currentKey = null;
-                valueTuple = null;
-
-                // store the output into hash map for now
-                addOutputToAggMap(output);
-
-                mapDumpIterator = aggMap.values().iterator();
-            }
-        }
-
-    }
-
-    private void logCapacityOfAggMap() {
-        log.info("Maximum capacity of hashmap used for map"
-                + " partial aggregation was " + maxHashMapSize + " entries");
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        // combiner optimizer does not get invoked if the plan is being executed
+        // under illustrate, so POPartialAgg should not get used in that case
+        throw new UnsupportedOperationException();
     }
 
-    private void addOutputToAggMap(Result output) throws ExecException {
-        aggMap.put(((Tuple) output.result).get(0), (Tuple) output.result);
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitPartialAgg(this);
     }
 
     private int getMinOutputReductionFromProp() {
         int minReduction = PigMapReduce.sJobConfInternal.get().getInt(
-                PROP_PARTAGG_MINREDUCTION, 0);
-     
+                PigConfiguration.PARTAGG_MINREDUCTION, DEFAULT_MIN_REDUCTION);
         if (minReduction <= 0) {
-            // the default minimum reduction is 10
+            LOG.info("Specified reduction is < 0 (" + minReduction + "). Using default " + DEFAULT_MIN_REDUCTION);
             minReduction = DEFAULT_MIN_REDUCTION;
         }
         return minReduction;
     }
 
-    private Result getNextResFromMap() {
-        if (!mapDumpIterator.hasNext()) {
-            mapDumpIterator = null;
-            return EOP_RESULT;
+    private float getPercentUsageFromProp() {
+        float percent = 0.2F;
+        if (PigMapReduce.sJobConfInternal.get() != null) {
+            String usage = PigMapReduce.sJobConfInternal.get().get(
+                    PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
+            if (usage != null) {
+                percent = Float.parseFloat(usage);
         }
-        Tuple outTuple = mapDumpIterator.next();
-        mapDumpIterator.remove();
-        return new Result(POStatus.STATUS_OK, outTuple);
-    }
-
-    private Result getOutput() throws ExecException {
-        Tuple output = tupleFact.newTuple(valuePlans.size() + 1);
-        output.set(0, currentKey);
-
-        for (int i = 0; i < valuePlans.size(); i++) {
-            valuePlans.get(i).attachInput(valueTuple);
-            Result valRes = getResult(valueLeaves.get(i));
-            if (valRes == ERR_RESULT) {
-                return ERR_RESULT;
-            }
-            output.set(i + 1, valRes.result);
         }
-        return new Result(POStatus.STATUS_OK, output);
+        return percent;
     }
 
-    private void resetCurrentValues() throws ExecException {
-        for (int i = 1; i < valueTuple.size(); i++) {
-            ((DataBag) valueTuple.get(i)).clear();
-        }
-    }
-
-    private void addToCurrentValues(Tuple inpTuple) throws ExecException {
-        for (int i = 1; i < inpTuple.size(); i++) {
-            DataBag bag = (DataBag) valueTuple.get(i);
-            bag.add((Tuple) inpTuple.get(i));
-        }
-    }
 
     private Result getResult(ExpressionOperator op) throws ExecException {
         Result res = ERR_RESULT;
-
         switch (op.getResultType()) {
         case DataType.BAG:
         case DataType.BOOLEAN:
@@ -488,6 +466,28 @@ public class POPartialAgg extends Physic
         return ERR_RESULT;
     }
 
+    /**
+     * Runs the provided key-value pair through the aggregator plans.
+     * @param key
+     * @param value
+     * @return Result, containing a tuple of form (key, tupleReturnedByPlan1, tupleReturnedByPlan2, ...)
+     * @throws ExecException
+     */
+    private Result getOutput(Object key, Tuple value) throws ExecException {
+        Tuple output = TF.newTuple(valuePlans.size() + 1);
+        output.set(0, key);
+
+        for (int i = 0; i < valuePlans.size(); i++) {
+            valuePlans.get(i).attachInput(value);
+            Result valRes = getResult(valueLeaves.get(i));
+            if (valRes == ERR_RESULT) {
+                return ERR_RESULT;
+            }
+            output.set(i + 1, valRes.result);
+        }
+        return new Result(POStatus.STATUS_OK, output);
+    }
+
     @Override
     public boolean supportsMultipleInputs() {
         return false;

Modified: pig/trunk/src/org/apache/pig/builtin/Distinct.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/Distinct.java?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/Distinct.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/Distinct.java Thu Aug 30 05:10:34 2012
@@ -27,6 +27,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalDistinctBag;
+import org.apache.pig.data.SingleTupleBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 
@@ -82,8 +83,7 @@ public class Distinct  extends EvalFunc<
             // representing the data we want to distinct. 
             // unwrap, put in a bag and send down
             try {
-                DataBag bag = bagFactory.newDefaultBag();
-                bag.add((Tuple)input.get(0));
+                DataBag bag = new SingleTupleBag((Tuple)input.get(0));
                 return tupleFactory.newTuple(bag);
             } catch (ExecException e) {
                 throw e;

Modified: pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java Thu Aug 30 05:10:34 2012
@@ -36,6 +36,7 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
@@ -81,7 +82,7 @@ public class InternalDistinctBag extends
         if (percent < 0) {
         	percent = 0.2F;            
         	if (PigMapReduce.sJobConfInternal.get() != null) {
-        		String usage = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.memusage");
+        		String usage = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
         		if (usage != null) {
         			percent = Float.parseFloat(usage);
         		}
@@ -95,15 +96,18 @@ public class InternalDistinctBag extends
     	mContents = new HashSet<Tuple>();      
     }
     
+    @Override
     public boolean isSorted() {
         return false;
     }
     
+    @Override
     public boolean isDistinct() {
         return true;
     }
     
     
+    @Override
     public long size() {
         if (mSpillFiles != null && mSpillFiles.size() > 0){
             //We need to racalculate size to guarantee a count of unique 
@@ -121,6 +125,7 @@ public class InternalDistinctBag extends
     }
     
     
+    @Override
     public Iterator<Tuple> iterator() {
         return new DistinctDataBagIterator();
     }
@@ -147,6 +152,7 @@ public class InternalDistinctBag extends
         }    	
     }
 
+    @Override
     public void addAll(DataBag b) {
     	Iterator<Tuple> iter = b.iterator();
     	while(iter.hasNext()) {
@@ -154,6 +160,7 @@ public class InternalDistinctBag extends
     	}
     }
 
+    @Override
     public void addAll(Collection<Tuple> c) {
     	Iterator<Tuple> iter = c.iterator();
     	while(iter.hasNext()) {
@@ -173,11 +180,13 @@ public class InternalDistinctBag extends
             public Tuple tuple;
             public int fileNum;
 
+            @Override
             @SuppressWarnings("unchecked")
 			public int compareTo(TContainer other) {
                 return tuple.compareTo(other.tuple);
             }
             
+            @Override
             public boolean equals(Object obj) {
             	if (obj instanceof TContainer) {
             		return compareTo((TContainer)obj) == 0;
@@ -186,6 +195,7 @@ public class InternalDistinctBag extends
             	return false;
             }
             
+            @Override
             public int hashCode() {
             	return tuple.hashCode();
             }
@@ -214,12 +224,14 @@ public class InternalDistinctBag extends
             }            
         }
 
+        @Override
         public boolean hasNext() { 
             // See if we can find a tuple.  If so, buffer it.
             mBuf = next();
             return mBuf != null;
         }
 
+        @Override
         public Tuple next() {
             // This will report progress every 1024 times through next.
             // This should be much faster than using mod.
@@ -245,6 +257,7 @@ public class InternalDistinctBag extends
         /**
          * Not implemented.
          */
+        @Override
         public void remove() {}
 
         private Tuple readFromTree() {
@@ -464,6 +477,7 @@ public class InternalDistinctBag extends
         }
     }
 
+    @Override
     public long spill(){
         return proactive_spill(null);
     }

Modified: pig/trunk/src/org/apache/pig/data/SelfSpillBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SelfSpillBag.java?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SelfSpillBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/SelfSpillBag.java Thu Aug 30 05:10:34 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.data;
 
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
@@ -51,7 +52,6 @@ public abstract class SelfSpillBag exten
     @InterfaceStability.Evolving
     public static class MemoryLimits {
 
-        public static final String PROP_CACHEDBAG_MEMUSAGE = "pig.cachedbag.memusage";
         private long maxMemUsage;
         private int cacheLimit = Integer.MAX_VALUE;
         private long memUsage = 0;
@@ -71,7 +71,7 @@ public abstract class SelfSpillBag exten
                 percent = 0.2F;
                 if (PigMapReduce.sJobConfInternal.get() != null) {
                     String usage = PigMapReduce.sJobConfInternal.get().get(
-                            PROP_CACHEDBAG_MEMUSAGE);
+                            PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
                     if (usage != null) {
                         percent = Float.parseFloat(usage);
                     }
@@ -79,7 +79,7 @@ public abstract class SelfSpillBag exten
             }
 
             long max = Runtime.getRuntime().maxMemory();
-            maxMemUsage = (long) (((float) max * percent) / (float) bagCount);
+            maxMemUsage = (long) ((max * percent) / bagCount);
 
             // set limit to 0, if memusage is 0 or really really small.
             // then all tuples are put into disk

Modified: pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/PigPerformanceLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/PigPerformanceLoader.java?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/PigPerformanceLoader.java (original)
+++ pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/PigPerformanceLoader.java Thu Aug 30 05:10:34 2012
@@ -56,13 +56,8 @@ public class PigPerformanceLoader extend
     class Caster implements LoadCaster {
         
         Utf8StorageConverter helper = new Utf8StorageConverter();
-        /**
-         * 
-         */
-        public Caster() {
-            // TODO Auto-generated constructor stub
-        }
         
+        @Override
         public DataBag bytesToBag(byte[] b, ResourceFieldSchema fs) throws IOException {
             if (b == null) return null;
 
@@ -102,10 +97,12 @@ public class PigPerformanceLoader extend
             return bag;
         }
 
+		@Override
 		public Map<String, Object> bytesToMap(byte[] b, ResourceFieldSchema fieldSchema) throws IOException {
-			throw new UnsupportedOperationException();
+			return helper.bytesToMap(b);
 		}
 
+        @Override
         public Map<String, Object> bytesToMap(byte[] b) throws IOException {
             if (b == null) return null;