You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2013/03/04 11:23:42 UTC

svn commit: r1452236 - in /pig/branches/branch-0.11: CHANGES.txt src/org/apache/pig/impl/util/SpillableMemoryManager.java

Author: rohini
Date: Mon Mar  4 10:23:42 2013
New Revision: 1452236

URL: http://svn.apache.org/r1452236
Log:
PIG-3148: OutOfMemory exception while spilling stale DefaultDataBag. Extra option to gc() before spilling large bag. (knoguchi via rohini)

Modified:
    pig/branches/branch-0.11/CHANGES.txt
    pig/branches/branch-0.11/src/org/apache/pig/impl/util/SpillableMemoryManager.java

Modified: pig/branches/branch-0.11/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/CHANGES.txt?rev=1452236&r1=1452235&r2=1452236&view=diff
==============================================================================
--- pig/branches/branch-0.11/CHANGES.txt (original)
+++ pig/branches/branch-0.11/CHANGES.txt Mon Mar  4 10:23:42 2013
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-3148: OutOfMemory exception while spilling stale DefaultDataBag. Extra option to gc() before spilling large bag. (knoguchi via rohini)
+
 PIG-3216: Groovy UDFs documentation has minor typos (herberts via rohini)
 
 PIG-3202: CUBE operator not documented in user docs (prasanth_j via billgraham)

Modified: pig/branches/branch-0.11/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1452236&r1=1452235&r2=1452236&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/impl/util/SpillableMemoryManager.java Mon Mar  4 10:23:42 2013
@@ -77,6 +77,11 @@ public class SpillableMemoryManager impl
     
     // log notification on collection threshold exceeded only the first time
     private boolean firstCollectionThreshExceededLogged = false;
+
+    // fraction of the total heap used for the threshold to determine
+    // if we want to perform an extra gc before the spill
+    private static double extraGCThresholdFraction = 0.05;
+    private static long extraGCSpillSizeThreshold  = 0L;
     
     private static SpillableMemoryManager manager;
 
@@ -85,6 +90,7 @@ public class SpillableMemoryManager impl
         List<MemoryPoolMXBean> mpbeans = ManagementFactory.getMemoryPoolMXBeans();
         MemoryPoolMXBean biggestHeap = null;
         long biggestSize = 0;
+        long totalSize = 0;
         for (MemoryPoolMXBean b: mpbeans) {
             log.debug("Found heap (" + b.getName() +
                 ") of type " + b.getType());
@@ -93,12 +99,14 @@ public class SpillableMemoryManager impl
                  * heap is the tenured heap
                  */
                 long size = b.getUsage().getMax();
+                totalSize += size;
                 if (size > biggestSize) {
                     biggestSize = size;
                     biggestHeap = b;
                 }
             }
         }
+        extraGCSpillSizeThreshold  = (long) (totalSize * extraGCThresholdFraction);
         if (biggestHeap == null) {
             throw new RuntimeException("Couldn't find heap");
         }
@@ -224,8 +232,10 @@ public class SpillableMemoryManager impl
             long estimatedFreed = 0;
             int numObjSpilled = 0;
             boolean invokeGC = false;
+            boolean extraGCCalled = false;
             for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) {
-                Spillable s = i.next().get();
+                WeakReference<Spillable> weakRef = i.next();
+                Spillable s = weakRef.get();
                 // Still need to check for null here, even after we removed
                 // above, because the reference may have gone bad on us
                 // since the last check.
@@ -240,6 +250,27 @@ public class SpillableMemoryManager impl
                     log.debug("spilling small files - getting out of memory handler");
                     break ;
                 }
+                // If single Spillable is bigger than the threshold,
+                // we force GC to make sure we really need to keep this
+                // object before paying for the expensive spill().
+                // Done at most once per handleNotification.
+                if( !extraGCCalled && extraGCSpillSizeThreshold != 0
+                    && toBeFreed > extraGCSpillSizeThreshold   ) {
+                    log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()");
+                    // this extra assignment to null is needed so that gc can free the
+                    // spillable if nothing else is pointing at it
+                    s = null;
+                    System.gc();
+                    extraGCCalled = true;
+                    // checking again to see if this reference is still valid
+                    s = weakRef.get();
+                    if (s == null) {
+                        i.remove();
+                        accumulatedFreeSize = 0;
+                        invokeGC = false;
+                        continue;
+                    }
+                }
                 s.spill();               
                 numObjSpilled++;
                 estimatedFreed += toBeFreed;