You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/09/15 19:51:30 UTC

svn commit: r1171195 [1/2] - in /pig/trunk: ./ conf/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ src/org/apache/pig/backend/hadoop/executi...

Author: thejas
Date: Thu Sep 15 17:51:27 2011
New Revision: 1171195

URL: http://svn.apache.org/viewvc?rev=1171195&view=rev
Log:
PIG-2228: support partial aggregation in map task (thejas)

Added:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
    pig/trunk/src/org/apache/pig/data/SelfSpillBag.java
    pig/trunk/src/org/apache/pig/data/SizeUtil.java
    pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java
    pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/conf/pig.properties
    pig/trunk/src/org/apache/pig/Algebraic.java
    pig/trunk/src/org/apache/pig/Main.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
    pig/trunk/src/org/apache/pig/data/DefaultTuple.java
    pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
    pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
    pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
    pig/trunk/src/org/apache/pig/data/SortedSpillBag.java
    pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
    pig/trunk/test/e2e/pig/tests/nightly.conf
    pig/trunk/test/org/apache/pig/test/TestDataBag.java
    pig/trunk/test/org/apache/pig/test/Util.java
    pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Sep 15 17:51:27 2011
@@ -130,6 +130,8 @@ OPTIMIZATIONS
 
 PIG-2011: Speed up TestTypedMap.java (dvryaboy)
 
+PIG-2228: support partial aggregation in map task (thejas)
+
 BUG FIXES
 
 PIG-2273: Pig.compileFromFile in embedded python fails when pig script starts with a comment (ddaniels888 via gates)

Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Thu Sep 15 17:51:27 2011
@@ -1,5 +1,7 @@
 # Pig configuration file. All values can be overwritten by command line arguments.
 
+# Use the "-h properties" command to see description of the properties
+
 # log4jconf log4j configuration file
 # log4jconf=./conf/log4j.properties
 
@@ -25,4 +27,31 @@
 
 #the following two parameters are to help estimate the reducer number
 #pig.exec.reducers.bytes.per.reducer=1000000000
-#pig.exec.reducers.max=999
\ No newline at end of file
+#pig.exec.reducers.max=999
+
+#Logging properties
+#verbose=false
+#brief=false
+#debug=INFO
+#aggregate.warning=true
+
+#Performance tuning properties
+#pig.cachedbag.memusage=0.2
+#pig.skewedjoin.reduce.memusagea=0.3
+#pig.exec.nocombiner=false
+#opt.multiquery=true
+#pig.tmpfilecompression=false
+
+#value can be lzo or gzip
+#pig.tmpfilecompression.codec=gzip
+#pig.noSplitCombination=true
+#pig.exec.mapPartAgg=false
+#pig.exec.mapPartAgg.minReduction=10
+
+
+#exectype=mapreduce
+#pig.additional.jars=<comma seperated list of jars>
+#udf.import.list=<comma seperated list of imports>
+#stop.on.failure=false
+
+

Modified: pig/trunk/src/org/apache/pig/Algebraic.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Algebraic.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Algebraic.java (original)
+++ pig/trunk/src/org/apache/pig/Algebraic.java Thu Sep 15 17:51:27 2011
@@ -39,6 +39,7 @@ import org.apache.pig.classification.Int
  * whether the intermediate function is called 0, 1, or more times.  Hadoop makes no guarantees
  * about how many times the combiner will be called in a job.
  *
+ *
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
@@ -47,12 +48,14 @@ public interface Algebraic{
     /**
      * Get the initial function. 
      * @return A function name of f_init. f_init should be an eval func.
+     * The return type of f_init.exec() has to be Tuple
      */
     public String getInitial();
 
     /**
      * Get the intermediate function. 
      * @return A function name of f_intermed. f_intermed should be an eval func.
+     * The return type of f_intermed.exec() has to be Tuple
      */
     public String getIntermed();
 

Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Thu Sep 15 17:51:27 2011
@@ -800,6 +800,12 @@ public static void printProperties(){
         System.out.println("            Used in conjunction with pig.tmpfilecompression. Defines compression type."); 
         System.out.println("        pig.noSplitCombination=true|false. Split combination is on by default.");
         System.out.println("            Determines if multiple small files are combined into a single map.");
+        System.out.println("        pig.exec.mapPartAgg=true|false. Default is false.");
+        System.out.println("            Determines if partial aggregation is done within map phase, ");
+        System.out.println("            before records are sent to combiner.");
+        System.out.println("        pig.exec.mapPartAgg.minReduction=<min aggregation factor>. Default is 10.");
+        System.out.println("            If the in-map partial aggregation does not reduce the output num records");
+        System.out.println("            by this factor, it gets disabled.");        
         System.out.println("    Miscellaneous:");
         System.out.println("        exectype=mapreduce|local; default is mapreduce. This property is the same as -x switch");
         System.out.println("        pig.additional.jars=<comma seperated list of jars>. Used in place of register command.");

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Thu Sep 15 17:51:27 2011
@@ -24,12 +24,14 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 
 import org.apache.pig.PigException;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigWarning;
 import org.apache.pig.data.DataType;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -45,6 +47,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -91,15 +94,18 @@ public class CombinerOptimizer extends M
 
     private CompilationMessageCollector messageCollector = null;
 
-    public CombinerOptimizer(MROperPlan plan, String chunkSize) {
-        super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
-        messageCollector = new CompilationMessageCollector() ; 
+    private boolean doMapAgg;
+
+    public CombinerOptimizer(MROperPlan plan, boolean doMapAgg) {
+        this(plan, doMapAgg, new CompilationMessageCollector());
     }
 
-    public CombinerOptimizer(MROperPlan plan, String chunkSize, 
+    public CombinerOptimizer(MROperPlan plan, boolean doMapAgg, 
             CompilationMessageCollector messageCollector) {
+
         super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
-        this.messageCollector = messageCollector ; 
+        this.messageCollector = messageCollector;
+        this.doMapAgg = doMapAgg;
     }
 
     public CompilationMessageCollector getMessageCollector() {
@@ -264,12 +270,17 @@ public class CombinerOptimizer extends M
                 mr.combinePlan.add(combinePack);
                 mr.combinePlan.add(cfe);
                 mr.combinePlan.connect(combinePack, cfe);
+
                 // No need to connect projections in cfe to cp, because
                 // PigCombiner directly attaches output from package to
                 // root of remaining plan.
 
                 POLocalRearrange mlr = getNewRearrange(rearrange);
 
+                POPartialAgg mapAgg = null;
+                if(doMapAgg){
+                    mapAgg = createPartialAgg(cfe);
+                }
 
                 // A specialized local rearrange operator will replace
                 // the normal local rearrange in the map plan. This behaves
@@ -284,7 +295,7 @@ public class CombinerOptimizer extends M
                 // it is added to the end (This is required so that we can 
                 // set up the inner plan of the new Local Rearrange leaf in the map
                 // and combine plan to contain just the project of the key).
-                patchUpMap(mr.mapPlan, getPreCombinerLR(rearrange), mfe, mlr);
+                patchUpMap(mr.mapPlan, getPreCombinerLR(rearrange), mfe, mapAgg, mlr);
                 POLocalRearrange clr = getNewRearrange(rearrange);
 
                 mr.combinePlan.add(clr);
@@ -317,6 +328,31 @@ public class CombinerOptimizer extends M
 
 
     /**
+     * Translate POForEach in combiner into a POPartialAgg
+     * @param combineFE
+     * @return partial aggregate operator
+     * @throws CloneNotSupportedException 
+     */
+    private POPartialAgg createPartialAgg(POForEach combineFE)
+            throws CloneNotSupportedException {
+        String scope = combineFE.getOperatorKey().scope;
+        POPartialAgg poAgg = new POPartialAgg(new OperatorKey(scope, 
+                NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+        poAgg.setAlias(combineFE.getAlias());
+        poAgg.setResultType(combineFE.getResultType());
+
+        //first plan in combine foreach is the group key
+        poAgg.setKeyPlan(combineFE.getInputPlans().get(0).clone());
+
+        List<PhysicalPlan> valuePlans = new ArrayList<PhysicalPlan>();
+        for(int i=1; i<combineFE.getInputPlans().size(); i++){
+            valuePlans.add(combineFE.getInputPlans().get(i).clone());
+        }
+        poAgg.setValuePlans(valuePlans);
+        return poAgg;
+    }
+
+    /**
      * find algebraic operators and also check if the foreach statement
      *  is suitable for combiner use
      * @param feInners inner plans of foreach
@@ -544,11 +580,13 @@ public class CombinerOptimizer extends M
      * @param mapPlan
      * @param preCombinerLR
      * @param mfe
+     * @param mapAgg 
      * @param mlr
      * @throws PlanException 
      */
     private void patchUpMap(PhysicalPlan mapPlan, POPreCombinerLocalRearrange preCombinerLR,
-            POForEach mfe, POLocalRearrange mlr) throws PlanException {
+            POForEach mfe, POPartialAgg mapAgg, POLocalRearrange mlr)
+                    throws PlanException {
 
         POLocalRearrange oldLR = (POLocalRearrange)mapPlan.getLeaves().get(0);
         mapPlan.replace(oldLR, preCombinerLR);
@@ -556,8 +594,17 @@ public class CombinerOptimizer extends M
         mapPlan.add(mfe);
         mapPlan.connect(preCombinerLR, mfe);
 
+        //the operator before local rearrange
+        PhysicalOperator opBeforeLR = mfe;
+
+        if(mapAgg != null){
+            mapPlan.add(mapAgg);
+            mapPlan.connect(mfe, mapAgg);
+            opBeforeLR = mapAgg;
+        }
+
         mapPlan.add(mlr);
-        mapPlan.connect(mfe, mlr);
+        mapPlan.connect(opBeforeLR, mlr);
     }
 
     /**

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Sep 15 17:51:27 2011
@@ -56,7 +56,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -75,6 +74,15 @@ import org.apache.pig.tools.pigstats.Scr
  *
  */
 public class MapReduceLauncher extends Launcher{
+
+    public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+    
+    public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+        "mapreduce.fileoutputcommitter.marksuccessfuljobs";
+
+    public static final String PROP_EXEC_MAP_PARTAGG = "pig.exec.mapPartAgg";
+
+    
     private static final Log log = LogFactory.getLog(MapReduceLauncher.class);
  
     //used to track the exception thrown by the job control which is run in a separate thread
@@ -83,11 +91,6 @@ public class MapReduceLauncher extends L
     private boolean aggregateWarning = false;
 
     private Map<FileSpec, Exception> failureMap;
-
-    public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
-    
-    public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
-        "mapreduce.fileoutputcommitter.marksuccessfuljobs";
     
     private JobControl jc=null;
     
@@ -516,7 +519,9 @@ public class MapReduceLauncher extends L
         //String prop = System.getProperty("pig.exec.nocombiner");
         String prop = pc.getProperties().getProperty("pig.exec.nocombiner");
         if (!pc.inIllustrator && !("true".equals(prop)))  {
-            CombinerOptimizer co = new CombinerOptimizer(plan, lastInputChunkSize);
+            boolean doMapAgg = 
+                    Boolean.valueOf(pc.getProperties().getProperty(PROP_EXEC_MAP_PARTAGG,"false"));
+            CombinerOptimizer co = new CombinerOptimizer(plan, doMapAgg);
             co.visit();
             //display the warning message(s) from the CombinerOptimizer
             co.getMessageCollector().logMessages(MessageType.Warning, aggregateWarning, log);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Thu Sep 15 17:51:27 2011
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
@@ -278,6 +277,12 @@ public class PhyPlanSetter extends PhyPl
         lrfi.setParentPlan(parent);
     }
 */
+
+    @Override
+    public void visitPartialAgg(POPartialAgg poPartialAgg) {
+       poPartialAgg.setParentPlan(parent);
+    }
+
     @Override
     public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) {
         optimizedForEach.setParentPlan(parent);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Thu Sep 15 17:51:27 2011
@@ -23,6 +23,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -94,6 +95,12 @@ public class EndOfAllInputSetter extends
                 throws VisitorException {
             endOfAllInputFlag = true;
         }
+
+        @Override
+        public void visitPartialAgg(POPartialAgg partAgg){
+            endOfAllInputFlag = true;
+        }
+
         /**
          * @return if end of all input is present
          */

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Sep 15 17:51:27 2011
@@ -61,7 +61,8 @@ public class PhyPlanVisitor extends Plan
         visit();
         popWalker();
     }
- 
+
+
     public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException{
         List<PhysicalPlan> inpPlans = mg.getPlans();
         for (PhysicalPlan plan : inpPlans) {
@@ -298,7 +299,9 @@ public class PhyPlanVisitor extends Plan
     public void visitPreCombinerLocalRearrange(
             POPreCombinerLocalRearrange preCombinerLocalRearrange) {
         // TODO Auto-generated method stub
-        
+    }
+
+    public void visitPartialAgg(POPartialAgg poPartialAgg) {
     }
 
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Thu Sep 15 17:51:27 2011
@@ -156,6 +156,10 @@ public class PlanPrinter<O extends Opera
           else if(node instanceof POLocalRearrange){
             sb.append(planString(((POLocalRearrange)node).getPlans()));
           }
+          else if(node instanceof POPartialAgg){
+              sb.append(planString(((POPartialAgg)node).getKeyPlan()));
+              sb.append(planString(((POPartialAgg)node).getValuePlans()));
+          }
           else if(node instanceof POCollectedGroup){
             sb.append(planString(((POCollectedGroup)node).getPlans()));
           }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Thu Sep 15 17:51:27 2011
@@ -354,6 +354,7 @@ public class POLocalRearrange extends Ph
         return inp;
     }
 
+
     private void detachPlans(List<PhysicalPlan> plans) {
         for (PhysicalPlan ep : plans) {
             ep.detachInput();

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1171195&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Thu Sep 15 17:51:27 2011
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.SelfSpillBag.MemoryLimits;
+import org.apache.pig.data.SizeUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Do partial aggregation in map plan. It uses a hash-map to aggregate. If
+ * consecutive records have same key, it will aggregate those without adding
+ * them to the hash-map. As future optimization, the use of hash-map could be
+ * disabled when input data is sorted on group-by keys
+ */
+public class POPartialAgg extends PhysicalOperator {
+
+    public static final String PROP_PARTAGG_MINREDUCTION = "pig.exec.mapPartAgg.minReduction";
+
+    private static final Log log = LogFactory.getLog(POPartialAgg.class);
+    private static final long serialVersionUID = 1L;
+
+    private PhysicalPlan keyPlan;
+    private ExpressionOperator keyLeaf;
+
+    private List<PhysicalPlan> valuePlans;
+    private List<ExpressionOperator> valueLeaves;
+    private static final Result ERR_RESULT = new Result();
+    private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP,
+            null);
+
+    // run time variables
+    private transient Object currentKey = null;
+    private transient Map<Object, Tuple> aggMap;
+    // tuple of the format - (null(key),bag-val1,bag-val2,...)
+    // attach this to the plans with algebraic udf before evaluating the plans
+    private transient Tuple valueTuple = null;
+
+    private boolean isFinished = false;
+
+    private transient Iterator<Tuple> mapDumpIterator;
+    private transient int numToDump;
+
+    // maximum bag size of currentValues cached before aggregation is done
+    private static final int MAX_SIZE_CURVAL_CACHE = 1024;
+
+    // number of records to sample to determine average size used by each
+    // entry in hash map
+    private static final int NUM_RESRECS_TO_SAMPLE_SZ_ESTIMATE = 100;
+
+    // params for auto disabling map aggregation
+    private static final int NUM_INPRECS_TO_SAMPLE_SZ_REDUCTION = 1000;
+
+    private static final int DEFAULT_MIN_REDUCTION = 10;
+
+    private boolean disableMapAgg = false;
+    private int num_inp_recs;
+    private boolean sizeReductionChecked = false;
+
+    private transient int maxHashMapSize;
+
+    private transient TupleFactory tupleFact;
+    private transient MemoryLimits memLimits;
+
+    public POPartialAgg(OperatorKey k) {
+        super(k);
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        // combiner optimizer does not get invoked if the plan is being executed
+        // under illustrate, so POPartialAgg should not get used in that case
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitPartialAgg(this);
+    }
+
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+
+        if (disableMapAgg) {
+            // map aggregation has been automatically disabled
+            if (mapDumpIterator != null) {
+                // there are some accumulated entries in map to be dumped
+                return getNextResFromMap();
+            } else {
+                Result inp = processInput();
+                if (disableMapAgg) {
+                    // the in-map partial aggregation is an optional step, just
+                    // like the combiner.
+                    // act as if this operator was never there, by just 
+                    // returning the input
+                    return inp;
+                }
+            }
+        }
+
+        if (mapDumpIterator != null) {
+            // if this iterator is not null, we are process of dumping records
+            // from the map
+            if (isFinished) {
+                return getNextResFromMap();
+            } else if (numToDump > 0) {
+                // there are some tuples yet to be dumped, to free memory
+                --numToDump;
+                return getNextResFromMap();
+            } else {
+                mapDumpIterator = null;
+            }
+        }
+
+        if (isFinished) {
+            // done with dumping all records
+            return new Result(POStatus.STATUS_EOP, null);
+        }
+
+        while (true) {
+            //process each input until EOP
+            Result inp = processInput();
+            if (inp.returnStatus == POStatus.STATUS_ERR) {
+                // error
+                return inp;
+            }
+            if (inp.returnStatus == POStatus.STATUS_EOP) {
+                if (parentPlan.endOfAllInput) {
+                    // it is actually end of all input
+                    // start dumping results
+                    isFinished = true;
+                    logCapacityOfAggMap();
+                    // check if there was ANY input
+                    if (valueTuple == null) {
+                        return EOP_RESULT;
+                    }
+
+                    // first return agg for currentKey
+                    Result output = getOutput();
+                    aggMap.remove(currentKey);
+
+                    mapDumpIterator = aggMap.values().iterator();
+
+                    // free the variables not needed anymore
+                    currentKey = null;
+                    valueTuple = null;
+
+                    return output;
+                } else {
+                    // return EOP
+                    return inp;
+                }
+            }
+            if (inp.returnStatus == POStatus.STATUS_NULL) {
+                continue;
+            }
+
+            // check if this operator is doing a good job of reducing the number
+            // of records going to output to justify the costs of itself
+            // if not , disable map partial agg
+            if ((!sizeReductionChecked)) {
+                checkSizeReduction();
+
+                if (disableMapAgg) {
+                    // in-map partial aggregation just got disabled
+                    // return the new input record, it has not been aggregated
+                    return inp;
+                }
+            }
+
+            // we have some real input data
+
+            // setup input for keyplan
+            Tuple inpTuple = (Tuple) inp.result;
+            keyPlan.attachInput(inpTuple);
+
+            // evaluate the key
+            Result keyRes = getResult(keyLeaf);
+            if (keyRes == ERR_RESULT) {
+                return ERR_RESULT;
+            }
+            Object key = keyRes.result;
+            keyPlan.detachInput();
+
+            if (valueTuple == null) {
+                // this is the first record the operator is seeing
+                // do initializations
+                init(key, inpTuple);
+                continue;
+            } else {
+                // check if key changed
+                boolean keyChanged = (currentKey != null && key == null)
+                        || ((key != null) && (!key.equals(currentKey)));
+
+                if (!keyChanged) {
+                    addToCurrentValues(inpTuple);
+
+                    // if there are enough number of values,
+                    // aggregate the values accumulated in valueTuple
+                    if (((DefaultDataBag) valueTuple.get(1)).size() >= MAX_SIZE_CURVAL_CACHE) {
+                        // not a key change, so store the agg result back to bag
+                        aggregateCurrentValues();
+                    }
+                    continue;
+                } else {// new key
+
+                    // compute aggregate for currentKey
+                    Result output = getOutput();
+                    if (output.returnStatus != POStatus.STATUS_OK) {
+                        return ERR_RESULT;
+                    }
+                    
+                    // set new current key, value
+                    currentKey = key;
+                    resetCurrentValues();
+                    addToCurrentValues(inpTuple);
+
+                    // get existing result from map (if any) and add it to
+                    // current values
+                    Tuple existingResult = aggMap.get(key);
+
+                    // existingResult will be null only if key is absent in
+                    // aggMap
+                    if (existingResult != null) {
+                        addToCurrentValues(existingResult);
+                    }
+
+                    // storing a new entry in the map, so update estimate of
+                    // num of entries that will fit into the map
+                    if (memLimits.getNumObjectsSizeAdded() < NUM_RESRECS_TO_SAMPLE_SZ_ESTIMATE) {
+                        updateMaxMapSize(output.result);
+                    }
+
+                    // check if it is time to dump some aggs from the hashmap
+                    if (aggMap.size() >= maxHashMapSize) {
+                        // dump 10% of max hash size because dumping just one
+                        // record at a time might result in most group key being
+                        // dumped (depending on hashmap implementation)
+                        // TODO: dump the least recently/frequently used entries
+                        numToDump = maxHashMapSize / 10;
+                        mapDumpIterator = aggMap.values().iterator();
+
+                        return output;
+                    } else {
+                        // there is space available in the hashmap, store the
+                        // output there
+                        addOutputToAggMap(output);
+                    }
+
+                    continue;
+                }
+            }
+        }
+    }
+
+    private void updateMaxMapSize(Object result) {
+        long size = SizeUtil.getMapEntrySize(currentKey,
+                result);
+        memLimits.addNewObjSize(size);
+        maxHashMapSize = memLimits.getCacheLimit();
+    }
+
+    /**
+     * Aggregate values accumulated in
+     * 
+     * @throws ExecException
+     */
+    private void aggregateCurrentValues() throws ExecException {
+        for (int i = 0; i < valuePlans.size(); i++) {
+            valuePlans.get(i).attachInput(valueTuple);
+            Result valRes = getResult(valueLeaves.get(i));
+            if (valRes == ERR_RESULT) {
+                throw new ExecException(
+                        "Error computing aggregate during in-map partial aggregation");
+            }
+
+            Tuple aggVal = getAggResultTuple(valRes.result);
+
+            // i'th plan should read only from i'th bag
+            // so we are done with i'th bag, clear it and
+            // add the new agg result to it
+            DataBag valBag = (DataBag) valueTuple.get(i + 1);
+            valBag.clear();
+            valBag.add(aggVal);
+
+            valuePlans.get(i).detachInput();
+        }
+    }
+
+    private void init(Object key, Tuple inpTuple) throws ExecException {
+        tupleFact = TupleFactory.getInstance();
+
+        // value tuple has bags of values for currentKey
+        valueTuple = tupleFact.newTuple(valuePlans.size() + 1);
+
+        for (int i = 0; i < valuePlans.size(); i++) {
+            valueTuple.set(i + 1, new DefaultDataBag(new ArrayList<Tuple>(
+                    MAX_SIZE_CURVAL_CACHE)));
+        }
+
+        // set current key, add value
+        currentKey = key;
+        addToCurrentValues(inpTuple);
+        aggMap = new HashMap<Object, Tuple>();
+
+        // TODO: keep track of actual number of objects that share the
+        // memory limit. For now using a default of 3, which is what is
+        // used by InternalCachedBag
+        memLimits = new MemoryLimits(3, -1);
+        maxHashMapSize = Integer.MAX_VALUE;
+
+    }
+
+    private Tuple getAggResultTuple(Object result) throws ExecException {
+        try {
+            return (Tuple) result;
+        } catch (ClassCastException ex) {
+            throw new ExecException("Intermediate Algebraic "
+                    + "functions must implement EvalFunc<Tuple>");
+        }
+    }
+
+    private void checkSizeReduction() throws ExecException {
+
+        num_inp_recs++;
+        if (num_inp_recs == NUM_INPRECS_TO_SAMPLE_SZ_REDUCTION
+                || (aggMap != null && aggMap.size() == maxHashMapSize - 1)) {
+            // the above check for the hashmap current size is
+            // done to avoid having to keep track of any dumps that
+            // could
+            // happen before NUM_INPRECS_TO_SAMPLE_SZ_REDUCTION is
+            // reached
+
+            sizeReductionChecked = true;
+
+            // find out how many output records we have for this many
+            // input records
+
+            int outputReduction = aggMap.size() == 0 ? Integer.MAX_VALUE
+                    : num_inp_recs / aggMap.size();
+            int min_output_reduction = getMinOutputReductionFromProp();
+            if (outputReduction < min_output_reduction) {
+                disableMapAgg = true;
+                log.info("Disabling in-map partial aggregation because the "
+                        + "reduction in tuples (" + outputReduction
+                        + ") is lower than threshold (" + min_output_reduction
+                        + ")");
+                logCapacityOfAggMap();
+                // get current key vals output
+                Result output = getOutput();
+
+                // free the variables not needed anymore
+                currentKey = null;
+                valueTuple = null;
+
+                // store the output into hash map for now
+                addOutputToAggMap(output);
+
+                mapDumpIterator = aggMap.values().iterator();
+            }
+        }
+
+    }
+
+    private void logCapacityOfAggMap() {
+        log.info("Maximum capacity of hashmap used for map"
+                + " partial aggregation was " + maxHashMapSize + " entries");
+    }
+
+    private void addOutputToAggMap(Result output) throws ExecException {
+        aggMap.put(((Tuple) output.result).get(0), (Tuple) output.result);
+    }
+
+    private int getMinOutputReductionFromProp() {
+        int minReduction = PigMapReduce.sJobConfInternal.get().getInt(
+                PROP_PARTAGG_MINREDUCTION, 0);
+     
+        if (minReduction <= 0) {
+            // the default minimum reduction is 10
+            minReduction = DEFAULT_MIN_REDUCTION;
+        }
+        return minReduction;
+    }
+
+    private Result getNextResFromMap() {
+        if (!mapDumpIterator.hasNext()) {
+            mapDumpIterator = null;
+            return EOP_RESULT;
+        }
+        Tuple outTuple = mapDumpIterator.next();
+        mapDumpIterator.remove();
+        return new Result(POStatus.STATUS_OK, outTuple);
+    }
+
+    private Result getOutput() throws ExecException {
+        Tuple output = tupleFact.newTuple(valuePlans.size() + 1);
+        output.set(0, currentKey);
+
+        for (int i = 0; i < valuePlans.size(); i++) {
+            valuePlans.get(i).attachInput(valueTuple);
+            Result valRes = getResult(valueLeaves.get(i));
+            if (valRes == ERR_RESULT) {
+                return ERR_RESULT;
+            }
+            output.set(i + 1, valRes.result);
+        }
+        return new Result(POStatus.STATUS_OK, output);
+    }
+
+    private void resetCurrentValues() throws ExecException {
+        for (int i = 1; i < valueTuple.size(); i++) {
+            ((DataBag) valueTuple.get(i)).clear();
+        }
+    }
+
+    private void addToCurrentValues(Tuple inpTuple) throws ExecException {
+        for (int i = 1; i < inpTuple.size(); i++) {
+            DataBag bag = (DataBag) valueTuple.get(i);
+            bag.add((Tuple) inpTuple.get(i));
+        }
+    }
+
+    private Result getResult(ExpressionOperator op) throws ExecException {
+        Result res = ERR_RESULT;
+
+        switch (op.getResultType()) {
+        case DataType.BAG:
+        case DataType.BOOLEAN:
+        case DataType.BYTEARRAY:
+        case DataType.CHARARRAY:
+        case DataType.DOUBLE:
+        case DataType.FLOAT:
+        case DataType.INTEGER:
+        case DataType.LONG:
+        case DataType.MAP:
+        case DataType.TUPLE:
+            res = op.getNext(getDummy(op.getResultType()), op.getResultType());
+            break;
+        default:
+            String msg = "Invalid result type: "
+                    + DataType.findType(op.getResultType());
+            throw new ExecException(msg, 2270, PigException.BUG);
+        }
+
+        // allow null as group by key
+        if (res.returnStatus == POStatus.STATUS_OK
+                || res.returnStatus == POStatus.STATUS_NULL) {
+            return res;
+        }
+        return ERR_RESULT;
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "Partial Agg" + "["
+                + DataType.findTypeName(resultType) + "]" + mKey.toString();
+
+    }
+
+    public PhysicalPlan getKeyPlan() {
+        return keyPlan;
+    }
+
+    public void setKeyPlan(PhysicalPlan keyPlan) {
+        this.keyPlan = keyPlan;
+        keyLeaf = (ExpressionOperator) keyPlan.getLeaves().get(0);
+    }
+
+    public List<PhysicalPlan> getValuePlans() {
+        return valuePlans;
+    }
+
+    public void setValuePlans(List<PhysicalPlan> valuePlans) {
+        this.valuePlans = valuePlans;
+        valueLeaves = new ArrayList<ExpressionOperator>();
+        for (PhysicalPlan plan : valuePlans) {
+            valueLeaves.add((ExpressionOperator) plan.getLeaves().get(0));
+        }
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/data/DefaultTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DefaultTuple.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DefaultTuple.java (original)
+++ pig/trunk/src/org/apache/pig/data/DefaultTuple.java Thu Sep 15 17:51:27 2011
@@ -206,27 +206,18 @@ public class DefaultTuple implements Tup
         + 32 /* mFields array list fixed size */;
 
         // rest of the fixed portion of mfields size is accounted within empty_tuple_size
-        long mfields_var_size = roundToEight(4 + 4 * mFields.size());
+        long mfields_var_size = SizeUtil.roundToEight(4 + 4 * mFields.size());
         // in java hotspot 32bit vm, there seems to be a minimum tuple size of 96
         // which is probably from the minimum size of this array list
         mfields_var_size = Math.max(40, mfields_var_size);
 
         long sum = empty_tuple_size + mfields_var_size;
         while (i.hasNext()) {
-            sum += getFieldMemorySize(i.next());
+            sum += SizeUtil.getPigObjMemSize(i.next());
         }
         return sum;
     }
 
-    /**
-     * Memory size of objects are rounded to multiple of 8 bytes
-     * 
-     * @param i
-     * @return i rounded to a equal of higher multiple of 8
-     */
-    private long roundToEight(long i) {
-        return 8 * ((i + 7) / 8); // integer division rounds the result down
-    }
 
     /**
      * Write a tuple of atomic values into a string. All values in the tuple must be atomic (no bags, tuples, or maps).
@@ -562,78 +553,6 @@ public class DefaultTuple implements Tup
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private long getFieldMemorySize(Object o) {
-        // 12 is added to each to account for the object overhead and the
-        // pointer in the tuple.
-        switch (DataType.findType(o)) {
-        case DataType.BYTEARRAY: {
-            byte[] bytes = ((DataByteArray) o).get();
-            // bytearray size including rounding to 8 bytes
-            long byte_array_sz = roundToEight(bytes.length + 12);
-
-            return byte_array_sz + 16 /* 16 is additional size of DataByteArray */;
-        }
-
-        case DataType.CHARARRAY: {
-            String s = (String) o;
-            // See PIG-1443 for a reference for this formula
-            return roundToEight((s.length() * 2) + 38);
-        }
-
-        case DataType.TUPLE: {
-            Tuple t = (Tuple) o;
-            return t.getMemorySize();
-        }
-
-        case DataType.BAG: {
-            DataBag b = (DataBag) o;
-            return b.getMemorySize();
-        }
-
-        case DataType.INTEGER:
-            return 4 + 8 + 4/* +4 to round to 8 bytes */;
-
-        case DataType.LONG:
-            return 8 + 8;
-
-        case DataType.MAP: {
-            Map<String, Object> m = (Map<String, Object>) o;
-            Iterator<Map.Entry<String, Object>> i = m.entrySet().iterator();
-            long sum = 0;
-            while (i.hasNext()) {
-                Map.Entry<String, Object> entry = i.next();
-                sum += getFieldMemorySize(entry.getKey());
-                sum += getFieldMemorySize(entry.getValue());
-            }
-            // based on experiments on 32 bit Java HotSpot VM
-            // size of map with 0 entries is 120 bytes
-            // each additional entry have around 24 bytes overhead at
-            // small number of entries. At larger number of entries, the
-            // overhead is around 32 bytes, probably because of the expanded
-            // data structures in anticapation of more entries being added
-            return sum + m.size() * 32 + 120;
-        }
-
-        case DataType.FLOAT:
-            return 4 + 8 + 4/* +4 to round to 8 bytes */;
-
-        case DataType.DOUBLE:
-            return 8 + 8;
-
-        case DataType.BOOLEAN:
-            // boolean takes 1 byte , +7 to round it to 8
-            return 1 + 8 + 7;
-
-        case DataType.NULL:
-            return 0;
-
-        default:
-            // ??
-            return 12;
-        }
-    }
-
     /**
      * @return true if this Tuple is null
      */

Modified: pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/InternalCachedBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/InternalCachedBag.java Thu Sep 15 17:51:27 2011
@@ -33,16 +33,16 @@ import java.util.NoSuchElementException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigCounters;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
 
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
 
-public class InternalCachedBag extends DefaultAbstractBag {
+public class InternalCachedBag extends SelfSpillBag {
     private static final long serialVersionUID = 1L;
 
     private static final Log log = LogFactory.getLog(InternalCachedBag.class);
-    private transient int cacheLimit;
-    private transient long maxMemUsage;
-    private transient long memUsage;
     private transient DataOutputStream out;
     private transient boolean addDone;
     private transient TupleFactory factory;
@@ -51,58 +51,36 @@ public class InternalCachedBag extends D
     private transient int numTuplesSpilled = 0; 
  
     public InternalCachedBag() {
-        this(1);
+        this(1, -1f);
     }
 
     public InternalCachedBag(int bagCount) {       
-        float percent = 0.2F;
-        
-    	if (PigMapReduce.sJobConfInternal.get() != null) {
-    		String usage = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.memusage");
-    		if (usage != null) {
-    			percent = Float.parseFloat(usage);
-    		}
-    	}
-
-        init(bagCount, percent);
+        this(bagCount, -1f);
     }  
     
     public InternalCachedBag(int bagCount, float percent) {
-    	init(bagCount, percent);
+        super(bagCount, percent);
+    	init();
     }
     
-    private void init(int bagCount, float percent) {
-    	factory = TupleFactory.getInstance();        
-    	mContents = new ArrayList<Tuple>();             
-             	 
-    	long max = Runtime.getRuntime().maxMemory();
-        maxMemUsage = (long)(((float)max * percent) / (float)bagCount);
-        cacheLimit = Integer.MAX_VALUE;
-        
-        // set limit to 0, if memusage is 0 or really really small.
-        // then all tuples are put into disk
-        if (maxMemUsage < 1) {
-        	cacheLimit = 0;
-        }
-        
+    private void init() {
+        factory = TupleFactory.getInstance();        
+        mContents = new ArrayList<Tuple>();                    
         addDone = false;
     }
 
+
     public void add(Tuple t) {
     	
         if(addDone) {
             throw new IllegalStateException("InternalCachedBag is closed for adding new tuples");
         }
                 
-        if(mContents.size() < cacheLimit)  {
+        if(mContents.size() < memLimit.getCacheLimit())  {
             mContents.add(t);           
             if(mContents.size() < 100)
             {
-                memUsage += t.getMemorySize();
-                long avgUsage = memUsage / (long)mContents.size();
-                if (avgUsage > 0) {
-                	cacheLimit = (int)(maxMemUsage / avgUsage);
-                }
+                memLimit.addNewObjSize(t.getMemorySize());
             }
         } else {
             // above cacheLimit, spill to disk

Modified: pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java Thu Sep 15 17:51:27 2011
@@ -26,7 +26,6 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -37,9 +36,9 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.PigCounters;
-import org.apache.pig.PigWarning;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
 
 
 
@@ -54,6 +53,8 @@ import org.apache.pig.backend.hadoop.exe
  * This bag spills pro-actively when the number of tuples in memory 
  * reaches a limit
  */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
 public class InternalDistinctBag extends SortedSpillBag {
 
     /**
@@ -65,21 +66,18 @@ public class InternalDistinctBag extends
 
     private static TupleFactory gTupleFactory = TupleFactory.getInstance();
     
-    private transient boolean mReadStarted = false;
-    
-    private transient int cacheLimit;
-    private transient long maxMemUsage;
-    private transient long memUsage;    
+    private transient boolean mReadStarted = false; 
     
     public InternalDistinctBag() {
-        this(1, -1.0);
+        this(1, -1.0f);
     }
     
     public InternalDistinctBag(int bagCount) {        
-    	this(bagCount, -1.0);
+    	this(bagCount, -1.0f);
     }
     
-    public InternalDistinctBag(int bagCount, double percent) {        
+    public InternalDistinctBag(int bagCount, float percent) {        
+        super(bagCount, percent);
         if (percent < 0) {
         	percent = 0.2F;            
         	if (PigMapReduce.sJobConfInternal.get() != null) {
@@ -95,16 +93,6 @@ public class InternalDistinctBag extends
 
     private void init(int bagCount, double percent) {
     	mContents = new HashSet<Tuple>();      
-    	 
-    	long max = Runtime.getRuntime().maxMemory();
-        maxMemUsage = (long)(((float)max * percent) / (float)bagCount);
-        cacheLimit = Integer.MAX_VALUE;
-        
-        // set limit to 0, if memusage is 0 or really really small.
-        // then all tuples are put into disk
-        if (maxMemUsage < 1) {
-        	cacheLimit = 0;
-        }        
     }
     
     public boolean isSorted() {
@@ -144,7 +132,7 @@ public class InternalDistinctBag extends
             throw new IllegalStateException("InternalDistinctBag is closed for adding new tuples");
         }
                 
-    	if (mContents.size() > cacheLimit) {    		
+    	if (mContents.size() > memLimit.getCacheLimit()) {    		
     		proactive_spill(null);
     	}
     	            	
@@ -154,12 +142,7 @@ public class InternalDistinctBag extends
         	 // check how many tuples memory can hold by getting average
             // size of first 100 tuples
             if(mSize < 100 && (mSpillFiles == null || mSpillFiles.isEmpty())) {
-                memUsage += t.getMemorySize();
-                long avgUsage = memUsage / (long)mContents.size();
-                if (avgUsage >0) {
-                	cacheLimit = (int)(maxMemUsage / avgUsage);
-                	log.debug("Memory can hold "+ cacheLimit + " records.");
-                }
+                memLimit.addNewObjSize(t.getMemorySize());
             }          
         }    	
     }

Modified: pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/InternalSortedBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/InternalSortedBag.java Thu Sep 15 17:51:27 2011
@@ -64,10 +64,6 @@ public class InternalSortedBag extends S
 
     private transient Comparator<Tuple> mComp;
     private transient boolean mReadStarted = false;
-    
-    private transient int cacheLimit;
-    private transient long maxMemUsage;
-    private transient long memUsage;    
 
     static private class DefaultComparator implements Comparator<Tuple> {
         @SuppressWarnings("unchecked")
@@ -93,20 +89,11 @@ public class InternalSortedBag extends S
     }
 
     public InternalSortedBag(int bagCount, Comparator<Tuple> comp) {
-    	this(bagCount, -1.0, comp);
+    	this(bagCount, -1.0f, comp);
     }
     
-    public InternalSortedBag(int bagCount, double percent, Comparator<Tuple> comp) {
-    	if (percent < 0) {
-        	percent = 0.2F;            
-        	if (PigMapReduce.sJobConfInternal.get() != null) {
-        		String usage = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.memusage");
-        		if (usage != null) {
-        			percent = Float.parseFloat(usage);
-        		}
-        	}
-        }
-           	
+    public InternalSortedBag(int bagCount, float percent, Comparator<Tuple> comp) {
+        super(bagCount, percent);
     	init(bagCount, percent, comp);
     }
     
@@ -116,19 +103,8 @@ public class InternalSortedBag extends S
      */
     private void init(int bagCount, double percent, Comparator<Tuple> comp) {
         mComp = (comp == null) ? new DefaultComparator() : comp;
-
-        
     	mContents = new ArrayList<Tuple>();             
-             	 
-    	long max = Runtime.getRuntime().maxMemory();
-        maxMemUsage = (long)(((float)max * percent) / (float)bagCount);
-        cacheLimit = Integer.MAX_VALUE;
-        
-        // set limit to 0, if memusage is 0 or really really small.
-        // then all tuples are put into disk
-        if (maxMemUsage < 1) {
-        	cacheLimit = 0;
-        }        
+     
     }
     
     public void add(Tuple t) {
@@ -136,7 +112,7 @@ public class InternalSortedBag extends S
             throw new IllegalStateException("InternalSortedBag is closed for adding new tuples");
         }
                 
-    	if (mContents.size() > cacheLimit) {    		
+    	if (mContents.size() > memLimit.getCacheLimit()) {    		
     		proactive_spill(mComp);
     	}
     	        
@@ -146,11 +122,7 @@ public class InternalSortedBag extends S
         // size of first 100 tuples
         if(mSize < 100 && (mSpillFiles == null || mSpillFiles.isEmpty())&&t!=null)
         {
-            memUsage += t.getMemorySize();
-            long avgUsage = memUsage / (long)mContents.size();
-            if (avgUsage >0) {
-            	cacheLimit = (int)(maxMemUsage / avgUsage);
-            }
+            memLimit.addNewObjSize(t.getMemorySize());
         }
                 
         mSize++;

Added: pig/trunk/src/org/apache/pig/data/SelfSpillBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SelfSpillBag.java?rev=1171195&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SelfSpillBag.java (added)
+++ pig/trunk/src/org/apache/pig/data/SelfSpillBag.java Thu Sep 15 17:51:27 2011
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+ * Class to hold code common to self spilling bags such as InternalCachedBag
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class SelfSpillBag extends DefaultAbstractBag {
+    private static final long serialVersionUID = 1L;
+    protected MemoryLimits memLimit;
+
+    public SelfSpillBag(int bagCount) {
+        memLimit = new MemoryLimits(bagCount, -1);
+    }
+
+    public SelfSpillBag(int bagCount, float percent) {
+        memLimit = new MemoryLimits(bagCount, percent);
+    }
+
+    /**
+     * This class helps to compute the number of entries that should be held in
+     * memory so that memory consumption is limited. The memory limit is
+     * computed using the percentage of max memory that the user of this class
+     * is allowed to use, and number of similar objects that share this limit.
+     * The number of objects that will fit into this memory limit is computed
+     * using the average memory size of the objects whose size is given to this
+     * class.
+     */
+    @InterfaceAudience.Private
+    @InterfaceStability.Evolving
+    public static class MemoryLimits {
+
+        public static final String PROP_CACHEDBAG_MEMUSAGE = "pig.cachedbag.memusage";
+        private long maxMemUsage;
+        private int cacheLimit = Integer.MAX_VALUE;
+        private long memUsage = 0;
+        private long numObjsSizeChecked = 0;
+
+        /**
+         * @param bagCount
+         * @param percent
+         */
+        public MemoryLimits(int bagCount, float percent) {
+            init(bagCount, percent);
+        }
+
+        private void init(int bagCount, float percent) {
+
+            if (percent < 0) {
+                percent = 0.2F;
+                if (PigMapReduce.sJobConfInternal.get() != null) {
+                    String usage = PigMapReduce.sJobConfInternal.get().get(
+                            PROP_CACHEDBAG_MEMUSAGE);
+                    if (usage != null) {
+                        percent = Float.parseFloat(usage);
+                    }
+                }
+            }
+
+            long max = Runtime.getRuntime().maxMemory();
+            maxMemUsage = (long) (((float) max * percent) / (float) bagCount);
+
+            // set limit to 0, if memusage is 0 or really really small.
+            // then all tuples are put into disk
+            if (maxMemUsage < 1) {
+                cacheLimit = 0;
+            }
+        }
+
+        /**
+         * Computes the number of objects that would fit into memory based on
+         * the memory limit and average size of each object.
+         * 
+         * @return number of objects limit
+         */
+        public int getCacheLimit() {
+            if (numObjsSizeChecked > 0) {
+                long avgUsage = memUsage / numObjsSizeChecked;
+                if (avgUsage > 0) {
+                    cacheLimit = (int) (maxMemUsage / avgUsage);
+                }
+            }
+            return cacheLimit;
+        }
+
+        /**
+         * Submit information about size of another object
+         * 
+         * @param memorySize
+         */
+        public void addNewObjSize(long memorySize) {
+            memUsage += memorySize;
+            ++numObjsSizeChecked;
+        }
+
+        /**
+         * @return the size of
+         */
+        public long getNumObjectsSizeAdded() {
+            return numObjsSizeChecked;
+        }
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/data/SizeUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SizeUtil.java?rev=1171195&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SizeUtil.java (added)
+++ pig/trunk/src/org/apache/pig/data/SizeUtil.java Thu Sep 15 17:51:27 2011
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+ * Utility functions for estimating size of objects of pig types
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class SizeUtil {
+
+    private static final int MAP_MEM_PER_ENTRY = 32 + 120;
+
+    public static long getPigObjMemSize(Object o) {
+        // 12 is added to each to account for the object overhead and the
+        // pointer in the tuple.
+        switch (DataType.findType(o)) {
+        case DataType.BYTEARRAY: {
+            byte[] bytes = ((DataByteArray) o).get();
+            // bytearray size including rounding to 8 bytes
+            long byte_array_sz = roundToEight(bytes.length + 12);
+
+            return byte_array_sz + 16 /* 16 is additional size of DataByteArray */;
+        }
+
+        case DataType.CHARARRAY: {
+            String s = (String) o;
+            // See PIG-1443 for a reference for this formula
+            return roundToEight((s.length() * 2) + 38);
+        }
+
+        case DataType.TUPLE: {
+            Tuple t = (Tuple) o;
+            return t.getMemorySize();
+        }
+
+        case DataType.BAG: {
+            DataBag b = (DataBag) o;
+            return b.getMemorySize();
+        }
+
+        case DataType.INTEGER:
+            return 4 + 8 + 4/* +4 to round to 8 bytes */;
+
+        case DataType.LONG:
+            return 8 + 8;
+
+        case DataType.MAP: {
+            @SuppressWarnings("unchecked")
+            Map<String, Object> m = (Map<String, Object>) o;
+            Iterator<Map.Entry<String, Object>> i = m.entrySet().iterator();
+            long sum = 0;
+            while (i.hasNext()) {
+                Entry<String, Object> entry = i.next();
+                sum += getMapEntrySize(entry.getKey(), entry.getValue());
+            }
+            return sum;
+        }
+
+        case DataType.FLOAT:
+            return 4 + 8 + 4/* +4 to round to 8 bytes */;
+
+        case DataType.DOUBLE:
+            return 8 + 8;
+
+        case DataType.BOOLEAN:
+            // boolean takes 1 byte , +7 to round it to 8
+            return 1 + 8 + 7;
+
+        case DataType.NULL:
+            return 0;
+
+        default:
+            // ??
+            return 12;
+        }
+    }
+
+    public static long getMapEntrySize(Object key, Object value) {
+        // based on experiments on 32 bit Java HotSpot VM
+        // size of map with 0 entries is 120 bytes
+        // each additional entry have around 24 bytes overhead at
+        // small number of entries. At larger number of entries, the
+        // overhead is around 32 bytes, probably because of the expanded
+        // data structures in anticapation of more entries being added
+        return getPigObjMemSize(key) + getPigObjMemSize(value)
+                + MAP_MEM_PER_ENTRY;
+    }
+
+    /**
+     * Memory size of objects are rounded to multiple of 8 bytes
+     * 
+     * @param i
+     * @return i rounded to a equal of higher multiple of 8
+     */
+    public static long roundToEight(long i) {
+        return 8 * ((i + 7) / 8); // integer division rounds the result down
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/data/SortedSpillBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SortedSpillBag.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SortedSpillBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/SortedSpillBag.java Thu Sep 15 17:51:27 2011
@@ -24,15 +24,23 @@ import java.util.Comparator;
 
 import org.apache.pig.PigCounters;
 import org.apache.pig.PigWarning;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
 
 /**
  * Common functionality for proactively spilling bags that need to keep the data
  * sorted. 
  */
-public abstract class SortedSpillBag extends DefaultAbstractBag {
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class SortedSpillBag extends SelfSpillBag {
 
     private static final long serialVersionUID = 1L;
 
+    SortedSpillBag(int bagCount, float percent){
+        super(bagCount, percent);
+    }
+
     /**
      * Sort contents of mContents and write them to disk
      * @param comp Comparator to sort contents of mContents

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Thu Sep 15 17:51:27 2011
@@ -58,6 +58,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -150,7 +151,8 @@ public class ScriptState {
         LIMIT,
         UNION,
         COMBINER,
-        NATIVE;
+        NATIVE,
+        MAP_PARTIALAGG;
     };
     
     /**
@@ -607,7 +609,13 @@ public class ScriptState {
         @Override
         public void visitDemux(PODemux demux) throws VisitorException {
             feature.set(PIG_FEATURE.MULTI_QUERY.ordinal());         
-        }        
+        }
+        
+        @Override
+        public void visitPartialAgg(POPartialAgg partAgg){
+            feature.set(PIG_FEATURE.MAP_PARTIALAGG.ordinal());
+        }
+        
     }    
     
     static class LogicalPlanFeatureVisitor extends LogicalRelationalNodesVisitor {

Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Thu Sep 15 17:51:27 2011
@@ -507,10 +507,81 @@ c = load ':INPATH:/singlefile/studenttab
 d = cogroup b by group, c by name;
 e = foreach d generate flatten(group), SUM(c.gpa), COUNT(c.name);
 store e into ':OUTPATH:';\,
-			}
+			},
 			],
 		},
 		{
+        'name' => 'MapPartialAgg',
+        'tests' => [
+                    {
+            'num' => 1,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = group a by name;
+c = foreach b generate group, COUNT(a.age);
+store c into ':OUTPATH:';\,
+             'java_params' => ['-Dpig.exec.mapPartAgg=true']
+            },
+            {
+            #multiquery with group in one sub query
+            'num' => 2,
+            'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); 
+                        b = filter a by age < 22; store b into ':OUTPATH:.1'; 
+                        c = group b by age; 
+                        d = foreach c generate group, SUM(b.gpa);
+                        store d into ':OUTPATH:.2'; #,
+            'java_params' => ['-Dpig.exec.mapPartAgg=true']
+            
+            },
+            {
+             #multi query with two group on diff columns
+            'num' => 3,
+            'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); 
+                        g1 = group a by name;
+                        f1 = foreach g1 generate group as name, MAX(a.gpa);
+                        store f1 into ':OUTPATH:.1'; 
+                        g2 = group a by age;
+                        f2 = foreach g2 generate group as age, AVG(a.gpa);
+                        store f2 into ':OUTPATH:.2'; #,
+            'java_params' => ['-Dpig.exec.mapPartAgg=true']
+            
+            },
+            {
+             #multi query with three groups on diff columns, group key being an expression
+            'num' => 4,
+            'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); 
+                        g1 = group a by name;
+                        f1 = foreach g1 generate group as name, MAX(a.gpa);
+                        store f1 into ':OUTPATH:.1'; 
+                        g2 = group a by age%10;
+                        f2 = foreach g2 generate group as age_mod10, AVG(a.gpa);
+                        store f2 into ':OUTPATH:.2'; 
+                        g3 = group a by age;
+                        f3 = foreach g3 generate group%10, AVG(a.gpa);
+                        store f3 into ':OUTPATH:.3';                         
+                        g4 = group a by gpa;
+                        f4 = foreach g4 generate group as gpa, COUNT(a);
+                        store f4 into ':OUTPATH:.4';                                                 
+                        
+                        #,
+            'java_params' => ['-Dpig.exec.mapPartAgg=true']
+            
+            },
+            {
+            #aggregation gets more than one tuple for every tuple from load func 
+            
+            'num' => 5,
+            'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); 
+                        b = foreach a generate name, age, gpa, flatten(TOBAG(age,age)) as x;
+                        c = group b by age; 
+                        d = foreach c generate group, AVG(b.gpa);
+                        store d into ':OUTPATH:'; #,
+            'java_params' => ['-Dpig.exec.mapPartAgg=true']
+            
+            },            
+        
+            ],
+        },
+		{
 		'name' => 'EvalFunc',
 		'tests' => [
 			{

Modified: pig/trunk/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestDataBag.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestDataBag.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestDataBag.java Thu Sep 15 17:51:27 2011
@@ -900,7 +900,7 @@ public class TestDataBag extends junit.f
         iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
         
         // check bag with data written to disk
-        DataBag bg3 = new InternalSortedBag(1, 0.0, null);
+        DataBag bg3 = new InternalSortedBag(1, 0.0f, null);
         tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
         for (int i = 0; i < tupleContents.length; i++) {
             bg3.add(Util.createTuple(tupleContents[i]));
@@ -917,7 +917,7 @@ public class TestDataBag extends junit.f
         assertTrue(iter.hasNext());
         assertTrue(iter.hasNext());
         
-        DataBag bg4 = new InternalSortedBag(1, 0.0, null);
+        DataBag bg4 = new InternalSortedBag(1, 0.0f, null);
         bg4.add(iter.next());
         bg4.add(iter.next());
         assertTrue(iter.hasNext());
@@ -1022,7 +1022,7 @@ public class TestDataBag extends junit.f
         iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
         
         // check bag with data written to disk
-        DataBag bg3 = new InternalDistinctBag(1, 0.0);
+        DataBag bg3 = new InternalDistinctBag(1, 0.0f);
         tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}};
         for (int i = 0; i < tupleContents.length; i++) {
             bg3.add(Util.createTuple(tupleContents[i]));
@@ -1036,7 +1036,7 @@ public class TestDataBag extends junit.f
         assertTrue(iter.hasNext());
         assertTrue(iter.hasNext());
         
-        DataBag bg4 = new InternalDistinctBag(1, 0.0);
+        DataBag bg4 = new InternalDistinctBag(1, 0.0f);
         bg4.add(iter.next());
         bg4.add(iter.next());
         assertTrue(iter.hasNext());