You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2013/03/04 11:23:42 UTC
svn commit: r1452236 - in /pig/branches/branch-0.11: CHANGES.txt
src/org/apache/pig/impl/util/SpillableMemoryManager.java
Author: rohini
Date: Mon Mar 4 10:23:42 2013
New Revision: 1452236
URL: http://svn.apache.org/r1452236
Log:
PIG-3148: OutOfMemory exception while spilling stale DefaultDataBag. Extra option to gc() before spilling large bag. (knoguchi via rohini)
Modified:
pig/branches/branch-0.11/CHANGES.txt
pig/branches/branch-0.11/src/org/apache/pig/impl/util/SpillableMemoryManager.java
Modified: pig/branches/branch-0.11/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/CHANGES.txt?rev=1452236&r1=1452235&r2=1452236&view=diff
==============================================================================
--- pig/branches/branch-0.11/CHANGES.txt (original)
+++ pig/branches/branch-0.11/CHANGES.txt Mon Mar 4 10:23:42 2013
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-3148: OutOfMemory exception while spilling stale DefaultDataBag. Extra option to gc() before spilling large bag. (knoguchi via rohini)
+
PIG-3216: Groovy UDFs documentation has minor typos (herberts via rohini)
PIG-3202: CUBE operator not documented in user docs (prasanth_j via billgraham)
Modified: pig/branches/branch-0.11/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1452236&r1=1452235&r2=1452236&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/impl/util/SpillableMemoryManager.java Mon Mar 4 10:23:42 2013
@@ -77,6 +77,11 @@ public class SpillableMemoryManager impl
// log notification on collection threshold exceeded only the first time
private boolean firstCollectionThreshExceededLogged = false;
+
+ // fraction of the total heap used for the threshold to determine
+ // if we want to perform an extra gc before the spill
+ private static double extraGCThresholdFraction = 0.05;
+ private static long extraGCSpillSizeThreshold = 0L;
private static SpillableMemoryManager manager;
@@ -85,6 +90,7 @@ public class SpillableMemoryManager impl
List<MemoryPoolMXBean> mpbeans = ManagementFactory.getMemoryPoolMXBeans();
MemoryPoolMXBean biggestHeap = null;
long biggestSize = 0;
+ long totalSize = 0;
for (MemoryPoolMXBean b: mpbeans) {
log.debug("Found heap (" + b.getName() +
") of type " + b.getType());
@@ -93,12 +99,14 @@ public class SpillableMemoryManager impl
* heap is the tenured heap
*/
long size = b.getUsage().getMax();
+ totalSize += size;
if (size > biggestSize) {
biggestSize = size;
biggestHeap = b;
}
}
}
+ extraGCSpillSizeThreshold = (long) (totalSize * extraGCThresholdFraction);
if (biggestHeap == null) {
throw new RuntimeException("Couldn't find heap");
}
@@ -224,8 +232,10 @@ public class SpillableMemoryManager impl
long estimatedFreed = 0;
int numObjSpilled = 0;
boolean invokeGC = false;
+ boolean extraGCCalled = false;
for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) {
- Spillable s = i.next().get();
+ WeakReference<Spillable> weakRef = i.next();
+ Spillable s = weakRef.get();
// Still need to check for null here, even after we removed
// above, because the reference may have gone bad on us
// since the last check.
@@ -240,6 +250,27 @@ public class SpillableMemoryManager impl
log.debug("spilling small files - getting out of memory handler");
break ;
}
+ // If single Spillable is bigger than the threshold,
+ // we force GC to make sure we really need to keep this
+ // object before paying for the expensive spill().
+ // Done at most once per handleNotification.
+ if( !extraGCCalled && extraGCSpillSizeThreshold != 0
+ && toBeFreed > extraGCSpillSizeThreshold ) {
+ log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()");
+ // this extra assignment to null is needed so that gc can free the
+ // spillable if nothing else is pointing at it
+ s = null;
+ System.gc();
+ extraGCCalled = true;
+ // checking again to see if this reference is still valid
+ s = weakRef.get();
+ if (s == null) {
+ i.remove();
+ accumulatedFreeSize = 0;
+ invokeGC = false;
+ continue;
+ }
+ }
s.spill();
numObjSpilled++;
estimatedFreed += toBeFreed;