You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/05/24 00:59:46 UTC

svn commit: r1681398 - in /pig/branches/branch-0.15: CHANGES.txt src/org/apache/pig/impl/util/SpillableMemoryManager.java

Author: daijy
Date: Sat May 23 22:59:46 2015
New Revision: 1681398

URL: http://svn.apache.org/r1681398
Log:
PIG-4564: Pig can deadlock in POPartialAgg if there is a bag

Modified:
    pig/branches/branch-0.15/CHANGES.txt
    pig/branches/branch-0.15/src/org/apache/pig/impl/util/SpillableMemoryManager.java

Modified: pig/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/CHANGES.txt?rev=1681398&r1=1681397&r2=1681398&view=diff
==============================================================================
--- pig/branches/branch-0.15/CHANGES.txt (original)
+++ pig/branches/branch-0.15/CHANGES.txt Sat May 23 22:59:46 2015
@@ -74,6 +74,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4564: Pig can deadlock in POPartialAgg if there is a bag (rohini via daijy)
+
 PIG-4569: Fix e2e test Rank_1 failure (rohini)
 
 PIG-4490: MIN/MAX builtin UDFs return wrong results when accumulating for strings (xplenty via rohini)

Modified: pig/branches/branch-0.15/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1681398&r1=1681397&r2=1681398&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/impl/util/SpillableMemoryManager.java Sat May 23 22:59:46 2015
@@ -50,7 +50,11 @@ public class SpillableMemoryManager impl
 
     private final Log log = LogFactory.getLog(getClass());
 
-    LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
+    private LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
+    // References to spillables with size
+    private LinkedList<SpillablePtr> spillablesSR = null;
+
+    private Object spillLock = new Object();
 
     // if we freed at least this much, invoke GC
     // (default 40 MB - this can be overridden by user supplied property)
@@ -62,15 +66,15 @@ public class SpillableMemoryManager impl
 
     // this will keep track of memory freed across spills
     // and between GC invocations
-    private static long accumulatedFreeSize = 0L;
+    private long accumulatedFreeSize = 0L;
 
     // fraction of biggest heap for which we want to get
     // "memory usage threshold exceeded" notifications
-    private static double memoryThresholdFraction = 0.7;
+    private double memoryThresholdFraction = 0.7;
 
     // fraction of biggest heap for which we want to get
     // "collection threshold exceeded" notifications
-    private static double collectionMemoryThresholdFraction = 0.5;
+    private double collectionMemoryThresholdFraction = 0.5;
 
     // log notification on usage threshold exceeded only the first time
     private boolean firstUsageThreshExceededLogged = false;
@@ -80,10 +84,12 @@ public class SpillableMemoryManager impl
 
     // fraction of the total heap used for the threshold to determine
     // if we want to perform an extra gc before the spill
-    private static double extraGCThresholdFraction = 0.05;
-    private static long extraGCSpillSizeThreshold  = 0L;
+    private double extraGCThresholdFraction = 0.05;
+    private long extraGCSpillSizeThreshold  = 0L;
+
+    private volatile boolean blockRegisterOnSpill = false;
 
-    private static volatile SpillableMemoryManager manager;
+    private static volatile SpillableMemoryManager manager = new SpillableMemoryManager();
 
     private SpillableMemoryManager() {
         ((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this, null, null);
@@ -129,9 +135,6 @@ public class SpillableMemoryManager impl
     }
 
     public static SpillableMemoryManager getInstance() {
-        if (manager == null) {
-            manager = new SpillableMemoryManager();
-        }
         return manager;
     }
 
@@ -187,119 +190,135 @@ public class SpillableMemoryManager impl
             }
 
         }
-        clearSpillables();
         if (toFree < 0) {
             log.debug("low memory handler returning " +
                 "because there is nothing to free");
             return;
         }
-        synchronized(spillables) {
-            Collections.sort(spillables, new Comparator<WeakReference<Spillable>>() {
 
-                /**
-                 * We don't lock anything, so this sort may not be stable if a WeakReference suddenly
-                 * becomes null, but it will be close enough.
-                 * Also between the time we sort and we use these spillables, they
-                 * may actually change in size - so this is just best effort
-                 */
-                @Override
-                public int compare(WeakReference<Spillable> o1Ref, WeakReference<Spillable> o2Ref) {
-                    Spillable o1 = o1Ref.get();
-                    Spillable o2 = o2Ref.get();
-                    if (o1 == null && o2 == null) {
-                        return 0;
-                    }
-                    if (o1 == null) {
-                        return 1;
+        // Use a separate spillLock to block multiple handleNotification calls
+        synchronized (spillLock) {
+            synchronized(spillables) {
+                spillablesSR = new LinkedList<SpillablePtr>();
+                for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) {
+                    Spillable s = i.next().get();
+                    if (s == null) {
+                        i.remove();
+                        continue;
                     }
-                    if (o2 == null) {
+                    // Create a list with spillable size for stable sorting. Refer PIG-4012
+                    spillablesSR.add(new SpillablePtr(s, s.getMemorySize()));
+                }
+                log.debug("Spillables list size: " + spillablesSR.size());
+                Collections.sort(spillablesSR, new Comparator<SpillablePtr>() {
+                    @Override
+                    public int compare(SpillablePtr o1Ref, SpillablePtr o2Ref) {
+                        Spillable o1 = o1Ref.get();
+                        Spillable o2 = o2Ref.get();
+                        long o1Size = o1.getMemorySize();
+                        long o2Size = o2.getMemorySize();
+
+                        if (o1Size == o2Size) {
+                            return 0;
+                        }
+                        if (o1Size < o2Size) {
+                            return 1;
+                        }
                         return -1;
                     }
-                    long o1Size = o1.getMemorySize();
-                    long o2Size = o2.getMemorySize();
+                });
+                // Block new bags from being registered
+                blockRegisterOnSpill = true;
+            }
 
-                    if (o1Size == o2Size) {
-                        return 0;
-                    }
-                    if (o1Size < o2Size) {
-                        return 1;
-                    }
-                    return -1;
-                }
-            });
-            long estimatedFreed = 0;
-            int numObjSpilled = 0;
-            boolean invokeGC = false;
-            boolean extraGCCalled = false;
-            for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) {
-                WeakReference<Spillable> weakRef = i.next();
-                Spillable s = weakRef.get();
-                // Still need to check for null here, even after we removed
-                // above, because the reference may have gone bad on us
-                // since the last check.
-                if (s == null) {
-                    i.remove();
-                    continue;
-                }
-                long toBeFreed = s.getMemorySize();
-                log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold = "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize);
-                // Don't keep trying if the rest of files are too small
-                if (toBeFreed < spillFileSizeThreshold) {
-                    log.debug("spilling small files - getting out of memory handler");
-                    break ;
-                }
-                // If single Spillable is bigger than the threshold,
-                // we force GC to make sure we really need to keep this
-                // object before paying for the expensive spill().
-                // Done at most once per handleNotification.
-                // Do not invoke extraGC for GroupingSpillable. Its size will always exceed
-                // extraGCSpillSizeThreshold and the data is always strong referenced.
-                if( !extraGCCalled && extraGCSpillSizeThreshold != 0
-                    && toBeFreed > extraGCSpillSizeThreshold  && !(s instanceof GroupingSpillable)) {
-                    log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()");
-                    // this extra assignment to null is needed so that gc can free the
-                    // spillable if nothing else is pointing at it
-                    s = null;
-                    System.gc();
-                    extraGCCalled = true;
-                    // checking again to see if this reference is still valid
-                    s = weakRef.get();
+            try {
+                long estimatedFreed = 0;
+                int numObjSpilled = 0;
+                boolean invokeGC = false;
+                boolean extraGCCalled = false;
+                boolean isGroupingSpillable = false;
+                for (Iterator<SpillablePtr> i = spillablesSR.iterator(); i.hasNext();) {
+                    SpillablePtr sPtr = i.next();
+                    Spillable s = sPtr.get();
+                    // Still need to check for null here, even after we removed
+                    // above, because the reference may have gone bad on us
+                    // since the last check.
                     if (s == null) {
                         i.remove();
-                        accumulatedFreeSize = 0;
-                        invokeGC = false;
                         continue;
                     }
+                    long toBeFreed = sPtr.getMemorySize();
+                    log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold = "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize);
+                    // Don't keep trying if the rest of files are too small
+                    if (toBeFreed < spillFileSizeThreshold) {
+                        log.debug("spilling small files - getting out of memory handler");
+                        break ;
+                    }
+                    isGroupingSpillable = (s instanceof GroupingSpillable);
+                    // If single Spillable is bigger than the threshold,
+                    // we force GC to make sure we really need to keep this
+                    // object before paying for the expensive spill().
+                    // Done at most once per handleNotification.
+                    // Do not invoke extraGC for GroupingSpillable. Its size will always exceed
+                    // extraGCSpillSizeThreshold and the data is always strong referenced.
+                    if( !extraGCCalled && extraGCSpillSizeThreshold != 0
+                        && toBeFreed > extraGCSpillSizeThreshold  && !isGroupingSpillable) {
+                        log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()");
+                        // this extra assignment to null is needed so that gc can free the
+                        // spillable if nothing else is pointing at it
+                        s = null;
+                        System.gc();
+                        extraGCCalled = true;
+                        // checking again to see if this reference is still valid
+                        s = sPtr.get();
+                        if (s == null) {
+                            i.remove();
+                            accumulatedFreeSize = 0;
+                            invokeGC = false;
+                            continue;
+                        }
+                    }
+                    // Unblock registering of new bags temporarily as aggregation
+                    // of POPartialAgg requires new record to be loaded.
+                    blockRegisterOnSpill = !isGroupingSpillable;
+                    try {
+                        s.spill();
+                    } finally {
+                        blockRegisterOnSpill = true;
+                    }
+
+                    numObjSpilled++;
+                    estimatedFreed += toBeFreed;
+                    accumulatedFreeSize += toBeFreed;
+                    // This should significantly reduce the number of small files
+                    // in case that we have a lot of nested bags
+                    if (accumulatedFreeSize > gcActivationSize) {
+                        invokeGC = true;
+                    }
+
+                    if (estimatedFreed > toFree) {
+                        log.debug("Freed enough space - getting out of memory handler");
+                        invokeGC = true;
+                        break;
+                    }
                 }
-                s.spill();
-                numObjSpilled++;
-                estimatedFreed += toBeFreed;
-                accumulatedFreeSize += toBeFreed;
-                // This should significantly reduce the number of small files
-                // in case that we have a lot of nested bags
-                if (accumulatedFreeSize > gcActivationSize) {
-                    invokeGC = true;
+                spillablesSR = null;
+                /* Poke the GC again to see if we successfully freed enough memory */
+                if(invokeGC) {
+                    System.gc();
+                    // now that we have invoked the GC, reset accumulatedFreeSize
+                    accumulatedFreeSize = 0;
                 }
-
-                if (estimatedFreed > toFree) {
-                    log.debug("Freed enough space - getting out of memory handler");
-                    invokeGC = true;
-                    break;
+                if(estimatedFreed > 0){
+                    String msg = "Spilled an estimate of " + estimatedFreed +
+                    " bytes from " + numObjSpilled + " objects. " + info.getUsage();;
+                    log.info(msg);
                 }
+            } finally {
+                blockRegisterOnSpill = false;
             }
-            /* Poke the GC again to see if we successfully freed enough memory */
-            if(invokeGC) {
-                System.gc();
-                // now that we have invoked the GC, reset accumulatedFreeSize
-                accumulatedFreeSize = 0;
-            }
-            if(estimatedFreed > 0){
-                String msg = "Spilled an estimate of " + estimatedFreed +
-                " bytes from " + numObjSpilled + " objects. " + info.getUsage();;
-                log.info(msg);
-            }
-
         }
+
     }
 
     public void clearSpillables() {
@@ -321,7 +340,7 @@ public class SpillableMemoryManager impl
      * @param s the spillable to track.
      */
     public void registerSpillable(Spillable s) {
-        synchronized(spillables) {
+        synchronized (spillables) {
             // Cleaing the entire list is too expensive.  Just trim off the front while
             // we can.
             WeakReference<Spillable> first = spillables.peek();
@@ -329,7 +348,46 @@ public class SpillableMemoryManager impl
                 spillables.remove();
                 first = spillables.peek();
             }
+
+            if (blockRegisterOnSpill) {
+                // When the spill is happening we do not want to register new bags
+                // save for exceptions like POPartialAgg. So block here.
+                // blockRegisterOnSpill is set to false in the finally block after spill.
+                // But just in case adding a safeguard of 5 min timeout (assuming a large
+                // spill completes within 5 mins) instead of infinitely blocking
+                // in case there are missed corner cases causing deadlock.
+                try {
+                    int i = 6000;
+                    for (; i > 0 && blockRegisterOnSpill; i--) {
+                        Thread.sleep(50);
+                    }
+                    if (i == 0) {
+                        log.warn("Spill took more than 5 mins. This needs investigation");
+                    }
+                } catch (InterruptedException e) {
+                    log.warn("Interrupted exception in registerSpillable while blocked on spill", e);
+                }
+                blockRegisterOnSpill = false;
+            }
             spillables.add(new WeakReference<Spillable>(s));
         }
     }
+
+    private static class SpillablePtr {
+        private WeakReference<Spillable> spillable;
+        private long size;
+
+        public SpillablePtr(Spillable p, long s) {
+            spillable = new WeakReference<Spillable>(p);
+            size = s;
+        }
+
+        public Spillable get() {
+            return spillable.get();
+        }
+
+        public long getMemorySize() {
+            return size;
+        }
+    }
 }