You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/05/24 00:59:46 UTC
svn commit: r1681398 - in /pig/branches/branch-0.15: CHANGES.txt
src/org/apache/pig/impl/util/SpillableMemoryManager.java
Author: daijy
Date: Sat May 23 22:59:46 2015
New Revision: 1681398
URL: http://svn.apache.org/r1681398
Log:
PIG-4564: Pig can deadlock in POPartialAgg if there is a bag
Modified:
pig/branches/branch-0.15/CHANGES.txt
pig/branches/branch-0.15/src/org/apache/pig/impl/util/SpillableMemoryManager.java
Modified: pig/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/CHANGES.txt?rev=1681398&r1=1681397&r2=1681398&view=diff
==============================================================================
--- pig/branches/branch-0.15/CHANGES.txt (original)
+++ pig/branches/branch-0.15/CHANGES.txt Sat May 23 22:59:46 2015
@@ -74,6 +74,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4564: Pig can deadlock in POPartialAgg if there is a bag (rohini via daijy)
+
PIG-4569: Fix e2e test Rank_1 failure (rohini)
PIG-4490: MIN/MAX builtin UDFs return wrong results when accumulating for strings (xplenty via rohini)
Modified: pig/branches/branch-0.15/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1681398&r1=1681397&r2=1681398&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/impl/util/SpillableMemoryManager.java Sat May 23 22:59:46 2015
@@ -50,7 +50,11 @@ public class SpillableMemoryManager impl
private final Log log = LogFactory.getLog(getClass());
- LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
+ private LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
+ // References to spillables with size
+ private LinkedList<SpillablePtr> spillablesSR = null;
+
+ private Object spillLock = new Object();
// if we freed at least this much, invoke GC
// (default 40 MB - this can be overridden by user supplied property)
@@ -62,15 +66,15 @@ public class SpillableMemoryManager impl
// this will keep track of memory freed across spills
// and between GC invocations
- private static long accumulatedFreeSize = 0L;
+ private long accumulatedFreeSize = 0L;
// fraction of biggest heap for which we want to get
// "memory usage threshold exceeded" notifications
- private static double memoryThresholdFraction = 0.7;
+ private double memoryThresholdFraction = 0.7;
// fraction of biggest heap for which we want to get
// "collection threshold exceeded" notifications
- private static double collectionMemoryThresholdFraction = 0.5;
+ private double collectionMemoryThresholdFraction = 0.5;
// log notification on usage threshold exceeded only the first time
private boolean firstUsageThreshExceededLogged = false;
@@ -80,10 +84,12 @@ public class SpillableMemoryManager impl
// fraction of the total heap used for the threshold to determine
// if we want to perform an extra gc before the spill
- private static double extraGCThresholdFraction = 0.05;
- private static long extraGCSpillSizeThreshold = 0L;
+ private double extraGCThresholdFraction = 0.05;
+ private long extraGCSpillSizeThreshold = 0L;
+
+ private volatile boolean blockRegisterOnSpill = false;
- private static volatile SpillableMemoryManager manager;
+ private static volatile SpillableMemoryManager manager = new SpillableMemoryManager();
private SpillableMemoryManager() {
((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this, null, null);
@@ -129,9 +135,6 @@ public class SpillableMemoryManager impl
}
public static SpillableMemoryManager getInstance() {
- if (manager == null) {
- manager = new SpillableMemoryManager();
- }
return manager;
}
@@ -187,119 +190,135 @@ public class SpillableMemoryManager impl
}
}
- clearSpillables();
if (toFree < 0) {
log.debug("low memory handler returning " +
"because there is nothing to free");
return;
}
- synchronized(spillables) {
- Collections.sort(spillables, new Comparator<WeakReference<Spillable>>() {
- /**
- * We don't lock anything, so this sort may not be stable if a WeakReference suddenly
- * becomes null, but it will be close enough.
- * Also between the time we sort and we use these spillables, they
- * may actually change in size - so this is just best effort
- */
- @Override
- public int compare(WeakReference<Spillable> o1Ref, WeakReference<Spillable> o2Ref) {
- Spillable o1 = o1Ref.get();
- Spillable o2 = o2Ref.get();
- if (o1 == null && o2 == null) {
- return 0;
- }
- if (o1 == null) {
- return 1;
+ // Use a separate spillLock to block multiple handleNotification calls
+ synchronized (spillLock) {
+ synchronized(spillables) {
+ spillablesSR = new LinkedList<SpillablePtr>();
+ for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) {
+ Spillable s = i.next().get();
+ if (s == null) {
+ i.remove();
+ continue;
}
- if (o2 == null) {
+ // Create a list with spillable size for stable sorting. Refer PIG-4012
+ spillablesSR.add(new SpillablePtr(s, s.getMemorySize()));
+ }
+ log.debug("Spillables list size: " + spillablesSR.size());
+ Collections.sort(spillablesSR, new Comparator<SpillablePtr>() {
+ @Override
+ public int compare(SpillablePtr o1Ref, SpillablePtr o2Ref) {
+ Spillable o1 = o1Ref.get();
+ Spillable o2 = o2Ref.get();
+ long o1Size = o1.getMemorySize();
+ long o2Size = o2.getMemorySize();
+
+ if (o1Size == o2Size) {
+ return 0;
+ }
+ if (o1Size < o2Size) {
+ return 1;
+ }
return -1;
}
- long o1Size = o1.getMemorySize();
- long o2Size = o2.getMemorySize();
+ });
+ // Block new bags from being registered
+ blockRegisterOnSpill = true;
+ }
- if (o1Size == o2Size) {
- return 0;
- }
- if (o1Size < o2Size) {
- return 1;
- }
- return -1;
- }
- });
- long estimatedFreed = 0;
- int numObjSpilled = 0;
- boolean invokeGC = false;
- boolean extraGCCalled = false;
- for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) {
- WeakReference<Spillable> weakRef = i.next();
- Spillable s = weakRef.get();
- // Still need to check for null here, even after we removed
- // above, because the reference may have gone bad on us
- // since the last check.
- if (s == null) {
- i.remove();
- continue;
- }
- long toBeFreed = s.getMemorySize();
- log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold = "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize);
- // Don't keep trying if the rest of files are too small
- if (toBeFreed < spillFileSizeThreshold) {
- log.debug("spilling small files - getting out of memory handler");
- break ;
- }
- // If single Spillable is bigger than the threshold,
- // we force GC to make sure we really need to keep this
- // object before paying for the expensive spill().
- // Done at most once per handleNotification.
- // Do not invoke extraGC for GroupingSpillable. Its size will always exceed
- // extraGCSpillSizeThreshold and the data is always strong referenced.
- if( !extraGCCalled && extraGCSpillSizeThreshold != 0
- && toBeFreed > extraGCSpillSizeThreshold && !(s instanceof GroupingSpillable)) {
- log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()");
- // this extra assignment to null is needed so that gc can free the
- // spillable if nothing else is pointing at it
- s = null;
- System.gc();
- extraGCCalled = true;
- // checking again to see if this reference is still valid
- s = weakRef.get();
+ try {
+ long estimatedFreed = 0;
+ int numObjSpilled = 0;
+ boolean invokeGC = false;
+ boolean extraGCCalled = false;
+ boolean isGroupingSpillable = false;
+ for (Iterator<SpillablePtr> i = spillablesSR.iterator(); i.hasNext();) {
+ SpillablePtr sPtr = i.next();
+ Spillable s = sPtr.get();
+ // Still need to check for null here, even after we removed
+ // above, because the reference may have gone bad on us
+ // since the last check.
if (s == null) {
i.remove();
- accumulatedFreeSize = 0;
- invokeGC = false;
continue;
}
+ long toBeFreed = sPtr.getMemorySize();
+ log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold = "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize);
+ // Don't keep trying if the rest of files are too small
+ if (toBeFreed < spillFileSizeThreshold) {
+ log.debug("spilling small files - getting out of memory handler");
+ break ;
+ }
+ isGroupingSpillable = (s instanceof GroupingSpillable);
+ // If single Spillable is bigger than the threshold,
+ // we force GC to make sure we really need to keep this
+ // object before paying for the expensive spill().
+ // Done at most once per handleNotification.
+ // Do not invoke extraGC for GroupingSpillable. Its size will always exceed
+ // extraGCSpillSizeThreshold and the data is always strong referenced.
+ if( !extraGCCalled && extraGCSpillSizeThreshold != 0
+ && toBeFreed > extraGCSpillSizeThreshold && !isGroupingSpillable) {
+ log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()");
+ // this extra assignment to null is needed so that gc can free the
+ // spillable if nothing else is pointing at it
+ s = null;
+ System.gc();
+ extraGCCalled = true;
+ // checking again to see if this reference is still valid
+ s = sPtr.get();
+ if (s == null) {
+ i.remove();
+ accumulatedFreeSize = 0;
+ invokeGC = false;
+ continue;
+ }
+ }
+ // Unblock registering of new bags temporarily as aggregation
+ // of POPartialAgg requires new record to be loaded.
+ blockRegisterOnSpill = !isGroupingSpillable;
+ try {
+ s.spill();
+ } finally {
+ blockRegisterOnSpill = true;
+ }
+
+ numObjSpilled++;
+ estimatedFreed += toBeFreed;
+ accumulatedFreeSize += toBeFreed;
+ // This should significantly reduce the number of small files
+ // in case that we have a lot of nested bags
+ if (accumulatedFreeSize > gcActivationSize) {
+ invokeGC = true;
+ }
+
+ if (estimatedFreed > toFree) {
+ log.debug("Freed enough space - getting out of memory handler");
+ invokeGC = true;
+ break;
+ }
}
- s.spill();
- numObjSpilled++;
- estimatedFreed += toBeFreed;
- accumulatedFreeSize += toBeFreed;
- // This should significantly reduce the number of small files
- // in case that we have a lot of nested bags
- if (accumulatedFreeSize > gcActivationSize) {
- invokeGC = true;
+ spillablesSR = null;
+ /* Poke the GC again to see if we successfully freed enough memory */
+ if(invokeGC) {
+ System.gc();
+ // now that we have invoked the GC, reset accumulatedFreeSize
+ accumulatedFreeSize = 0;
}
-
- if (estimatedFreed > toFree) {
- log.debug("Freed enough space - getting out of memory handler");
- invokeGC = true;
- break;
+ if(estimatedFreed > 0){
+ String msg = "Spilled an estimate of " + estimatedFreed +
+ " bytes from " + numObjSpilled + " objects. " + info.getUsage();;
+ log.info(msg);
}
+ } finally {
+ blockRegisterOnSpill = false;
}
- /* Poke the GC again to see if we successfully freed enough memory */
- if(invokeGC) {
- System.gc();
- // now that we have invoked the GC, reset accumulatedFreeSize
- accumulatedFreeSize = 0;
- }
- if(estimatedFreed > 0){
- String msg = "Spilled an estimate of " + estimatedFreed +
- " bytes from " + numObjSpilled + " objects. " + info.getUsage();;
- log.info(msg);
- }
-
}
+
}
public void clearSpillables() {
@@ -321,7 +340,7 @@ public class SpillableMemoryManager impl
* @param s the spillable to track.
*/
public void registerSpillable(Spillable s) {
- synchronized(spillables) {
+ synchronized (spillables) {
// Cleaing the entire list is too expensive. Just trim off the front while
// we can.
WeakReference<Spillable> first = spillables.peek();
@@ -329,7 +348,46 @@ public class SpillableMemoryManager impl
spillables.remove();
first = spillables.peek();
}
+
+ if (blockRegisterOnSpill) {
+ // When the spill is happening we do not want to register new bags
+ // save for exceptions like POPartialAgg. So block here.
+ // blockRegisterOnSpill is set to false in the finally block after spill.
+ // But just in case adding a safeguard of 5 min timeout (assuming a large
+ // spill completes within 5 mins) instead of infinitely blocking
+ // in case there are missed corner cases causing deadlock.
+ try {
+ int i = 6000;
+ for (; i > 0 && blockRegisterOnSpill; i--) {
+ Thread.sleep(50);
+ }
+ if (i == 0) {
+ log.warn("Spill took more than 5 mins. This needs investigation");
+ }
+ } catch (InterruptedException e) {
+ log.warn("Interrupted exception in registerSpillable while blocked on spill", e);
+ }
+ blockRegisterOnSpill = false;
+ }
spillables.add(new WeakReference<Spillable>(s));
}
}
+
+ private static class SpillablePtr {
+ private WeakReference<Spillable> spillable;
+ private long size;
+
+ public SpillablePtr(Spillable p, long s) {
+ spillable = new WeakReference<Spillable>(p);
+ size = s;
+ }
+
+ public Spillable get() {
+ return spillable.get();
+ }
+
+ public long getMemorySize() {
+ return size;
+ }
+ }
}