You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2012/12/07 00:46:24 UTC

svn commit: r1418135 - in /pig/branches/branch-0.11: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java src/org/apache/pig/data/DefaultAbstractBag.java

Author: dvryaboy
Date: Thu Dec  6 23:46:24 2012
New Revision: 1418135

URL: http://svn.apache.org/viewvc?rev=1418135&view=rev
Log:
PIG-3044: Trigger POPartialAgg compaction under GC pressure 

Modified:
    pig/branches/branch-0.11/CHANGES.txt
    pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
    pig/branches/branch-0.11/src/org/apache/pig/data/DefaultAbstractBag.java

Modified: pig/branches/branch-0.11/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/CHANGES.txt?rev=1418135&r1=1418134&r2=1418135&view=diff
==============================================================================
--- pig/branches/branch-0.11/CHANGES.txt (original)
+++ pig/branches/branch-0.11/CHANGES.txt Thu Dec  6 23:46:24 2012
@@ -30,6 +30,8 @@ PIG-1891 Enable StoreFunc to make intell
 
 IMPROVEMENTS
 
+PIG-3044: Trigger POPartialAgg compaction under GC pressure (dvryaboy)
+
 PIG-2907: Publish pig jars for Hadoop2/23 to maven (rohini)
 
 PIG-2934: HBaseStorage filter optimizations (billgraham)

Modified: pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1418135&r1=1418134&r2=1418135&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Thu Dec  6 23:46:24 2012
@@ -44,6 +44,9 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Spillable;
+import org.apache.pig.impl.util.SpillableMemoryManager;
+
 import com.google.common.collect.Maps;
 
 /**
@@ -53,7 +56,7 @@ import com.google.common.collect.Maps;
  * map. Once that map fills up or all input has been seen, results are
  * piped out into the next operator (caller of getNext()).
  */
-public class POPartialAgg extends PhysicalOperator {
+public class POPartialAgg extends PhysicalOperator implements Spillable {
     private static final Log LOG = LogFactory.getLog(POPartialAgg.class);
     private static final long serialVersionUID = 1L;
 
@@ -104,9 +107,10 @@ public class POPartialAgg extends Physic
     private transient boolean initialized = false;
     private int firstTierThreshold = FIRST_TIER_THRESHOLD;
     private int secondTierThreshold = SECOND_TIER_THRESHOLD;
-    private int sizeReduction;
+    private int sizeReduction = 1;
+    private int avgTupleSize = 0;
     private Iterator<Entry<Object, List<Tuple>>> spillingIterator;
-    private boolean estimatedMemThresholds;
+    private boolean estimatedMemThresholds = false;
 
 
     public POPartialAgg(OperatorKey k) {
@@ -121,6 +125,7 @@ public class POPartialAgg extends Physic
             disableMapAgg();
     }
         initialized = true;
+        SpillableMemoryManager.getInstance().registerSpillable(this);
     }
 
     @Override
@@ -222,6 +227,7 @@ public class POPartialAgg extends Physic
                     memLimits.addNewObjSize(mem);
                     }
             }
+            avgTupleSize = estTotalMem / estTuples;
             int totalTuples = memLimits.getCacheLimit();
             LOG.info("Estimated total tuples to buffer, based on " + estTuples + " tuples that took up " + estTotalMem + " bytes: " + totalTuples);
             firstTierThreshold = (int) (0.5 + totalTuples * (1f - (1f / sizeReduction)));
@@ -287,8 +293,8 @@ public class POPartialAgg extends Physic
        value.add(inpTuple);
        if (value.size() >= MAX_LIST_SIZE) {
            boolean isFirst = (map == rawInputMap);
-           if (LOG.isInfoEnabled()){
-               LOG.info("The cache for key " + key + " has grown too large. Aggregating " + ((isFirst) ? "first level." : "second level."));
+            if (LOG.isDebugEnabled()){
+                LOG.debug("The cache for key " + key + " has grown too large. Aggregating " + ((isFirst) ? "first level." : "second level."));
            }
            if (isFirst) {
                aggregateRawRow(key);
@@ -526,4 +532,20 @@ public class POPartialAgg extends Physic
         }
     }
 
+    @Override
+    public long spill() {
+        try {
+            LOG.info("Spill triggered by SpillableMemoryManager");
+            startSpill();
+        } catch (ExecException e) {
+            throw new RuntimeException(e);
+        }
+        return 0;
+    }
+
+    @Override
+    public long getMemorySize() {
+        return avgTupleSize * (numRecsInProcessedMap + numRecsInRawMap);
+    }
+
 }

Modified: pig/branches/branch-0.11/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1418135&r1=1418134&r2=1418135&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/data/DefaultAbstractBag.java Thu Dec  6 23:46:24 2012
@@ -67,7 +67,7 @@ public abstract class DefaultAbstractBag
 
     protected int mLastContentsSize = -1;
 
-    protected long mMemSize = 0;
+    protected long avgTupleSize = 0;
 
     private boolean spillableRegistered = false;
 
@@ -130,33 +130,40 @@ public abstract class DefaultAbstractBag
      */
     @Override
     public long getMemorySize() {
-        int j;
+        int j = 0;
         int numInMem = 0;
-        long used = 0;
 
         synchronized (mContents) {
-            if (mLastContentsSize == mContents.size()) return mMemSize;
+            numInMem = mContents.size();
+
+
+            // If we've already gotten the estimate
+            // and the number of tuples hasn't changed, or was above 100 and
+            // is still above 100, we can
+            // produce a new estimate without sampling the tuples again.
+            if (avgTupleSize != 0 && (mLastContentsSize == numInMem ||
+                    mLastContentsSize > 100 && numInMem > 100))
+                return totalSizeFromAvgTupleSize(avgTupleSize, numInMem);
 
-            // I can't afford to talk through all the tuples every time the
+            // Measure only what's in memory, not what's on disk.
+            // I can't afford to walk through all the tuples every time the
             // memory manager wants to know if it's time to dump.  Just sample
             // the first 100 and see what we get.  This may not be 100%
             // accurate, but it's just an estimate anyway.
-            numInMem = mContents.size();
-            // Measure only what's in memory, not what's on disk.
             Iterator<Tuple> i = mContents.iterator();
             for (j = 0; i.hasNext() && j < 100; j++) { 
-                used += i.next().getMemorySize();
+                avgTupleSize += i.next().getMemorySize();
             }
-            mLastContentsSize = numInMem;
         }
 
-        if (numInMem > 100) {
-            // Estimate the per tuple size.  Do it in integer arithmetic
-            // (even though it will be slightly less accurate) for speed.
-            used /= j;
-            used *= numInMem;
+        mLastContentsSize = numInMem;
+        avgTupleSize /= j;
+        return totalSizeFromAvgTupleSize(avgTupleSize, numInMem);
         }
 
+    private long totalSizeFromAvgTupleSize(long avgTupleSize, int numInMem) {
+        long used = avgTupleSize * numInMem;
+
         // add up the overhead for this object and other object variables
         int bag_fix_size = 8 /* object header */ 
         + 4 + 8 + 8 /* mLastContentsSize + mMemSize + mSize */
@@ -185,11 +192,9 @@ public abstract class DefaultAbstractBag
                 used += mSpillFiles.size() * approx_per_entry_size;
             }
         }
-        
-        mMemSize = used;
         return used;
-    }
 
+    }
     
     /**
      * Memory size of objects are rounded to multiple of 8 bytes