You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/10/17 23:19:52 UTC

svn commit: r1632668 - in /pig/branches/branch-0.14: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java src/org/apache/pig/impl/util/SpillableMemoryManager.java

Author: daijy
Date: Fri Oct 17 21:19:52 2014
New Revision: 1632668

URL: http://svn.apache.org/r1632668
Log:
Rollback PIG-3979: group all performance, garbage collection, and incremental aggregation

Modified:
    pig/branches/branch-0.14/CHANGES.txt
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
    pig/branches/branch-0.14/src/org/apache/pig/impl/util/SpillableMemoryManager.java

Modified: pig/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/CHANGES.txt?rev=1632668&r1=1632667&r2=1632668&view=diff
==============================================================================
--- pig/branches/branch-0.14/CHANGES.txt (original)
+++ pig/branches/branch-0.14/CHANGES.txt Fri Oct 17 21:19:52 2014
@@ -92,8 +92,6 @@ PIG-2834: MultiStorage requires unused c
 
 PIG-4230: Documentation fix: first nested foreach example is incomplete (lbendig via daijy)
 
-PIG-3979: group all performance, garbage collection, and incremental aggregation (ddreyfus via daijy)
-
 PIG-4199: Mapreduce ACLs should be translated to Tez ACLs (rohini)
 
 PIG-4227: Streaming Python UDF handles bag outputs incorrectly (cheolsoo)

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1632668&r1=1632667&r2=1632668&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Fri Oct 17 21:19:52 2014
@@ -63,6 +63,10 @@ public class POPartialAgg extends Physic
     private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP,
             null);
 
+    // number of records to sample to determine average size used by each
+    // entry in hash map and average seen reduction
+    private static final int NUM_RECS_TO_SAMPLE = 10000;
+
     // We want to avoid massive ArrayList copies as they get big.
     // Array Lists grow by prevSize + prevSize/2. Given default initial size of 10,
     // 9369 is the size of the array after 18 such resizings. This seems like a sufficiently
@@ -96,15 +100,7 @@ public class POPartialAgg extends Physic
     private boolean disableMapAgg = false;
     private boolean sizeReductionChecked = false;
     private boolean inputsExhausted = false;
-    // The doSpill flag is set when spilling is running or needs to run.
-    // It is set by POPartialAgg when its buffers are full after having run aggregations.
-    // The doContingentSpill flag is set when the SpillableMemoryManager is notified
-    // by GC that the runtime is low on memory and the SpillableMemoryManager identifies
-    // the particular buffer as a good spill candidate because it is large. The contingent spill logic tries
-    // to satisfy the memory manager's request for freeing memory by aggregating data
-    // rather than just spilling records to disk. 
     private volatile boolean doSpill = false;
-    private volatile boolean doContingentSpill = false;
     private transient MemoryLimits memLimits;
 
     private transient boolean initialized = false;
@@ -114,8 +110,6 @@ public class POPartialAgg extends Physic
     private int avgTupleSize = 0;
     private Iterator<Entry<Object, List<Tuple>>> spillingIterator;
     private boolean estimatedMemThresholds = false;
-    private long sampleMem;
-    private long sampleSize = 0;
 
 
     public POPartialAgg(OperatorKey k) {
@@ -130,9 +124,6 @@ public class POPartialAgg extends Physic
             disableMapAgg();
         }
         initialized = true;
-        sampleMem = (long) (Runtime.getRuntime().maxMemory() * percent);
-        sampleSize = 0;
-
         SpillableMemoryManager.getInstance().registerSpillable(this);
     }
 
@@ -154,39 +145,12 @@ public class POPartialAgg extends Physic
         }
 
         while (true) {
-            if (!sizeReductionChecked && sampleSize >= sampleMem /*
-                                                                  * numRecsInRawMap
-                                                                  * >=
-                                                                  * NUM_RECS_TO_SAMPLE
-                                                                  */) {
+            if (!sizeReductionChecked && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
                 checkSizeReduction();
-                sampleSize = 0;
             }
-            if (!estimatedMemThresholds && sampleSize >= sampleMem /*
-                                                                    * numRecsInRawMap
-                                                                    * >=
-                                                                    * NUM_RECS_TO_SAMPLE
-                                                                    */) {
+            if (!estimatedMemThresholds && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
                 estimateMemThresholds();
             }
-            if (doContingentSpill) {
-                doContingentSpill = false;
-                // Don't aggregate if spilling. Avoid concurrent update of spilling iterator.
-                if (doSpill == false) {
-                    // SpillableMemoryManager requested a spill to reduce memory
-                    // consumption.
-                    // See if we can avoid it.
-                    aggregateFirstLevel();
-                    aggregateSecondLevel();
-                    if (!shouldSpill()) {
-                        LOG.debug("Spill triggered by SpillableMemoryManager suppressed");
-                    } else {
-                        LOG.debug("Spill triggered by SpillableMemoryManager");
-                        doSpill = true;
-                    }
-                }
-
-            }
             if (doSpill) {
                 startSpill();
                 Result result = spillResult();
@@ -211,7 +175,7 @@ public class POPartialAgg extends Physic
                         // parent input is over. flush what we have.
                         inputsExhausted = true;
                         startSpill();
-                        LOG.debug("Spilling last bits.");
+                        LOG.info("Spilling last bits.");
                         continue;
                     } else {
                         return EOP_RESULT;
@@ -230,19 +194,6 @@ public class POPartialAgg extends Physic
                     }
                     Object key = keyRes.result;
                     keyPlan.detachInput();
-                    if (numRecsInRawMap == 0) {
-                        sampleSize = 0;
-                    }
-                    // Collecting the tuple memory size is surprisingly
-                    // expensive.
-                    // don't do it after we have the memory threshold since it's
-                    // no longer used.
-                    if (!estimatedMemThresholds) {
-                        int tupleSize = (int) inpTuple.getMemorySize();
-                        sampleSize += tupleSize;
-                        if (avgTupleSize == 0)
-                            avgTupleSize = tupleSize;
-                    }
                     numRecsInRawMap += 1;
                     addKeyValToMap(rawInputMap, key, inpTuple);
 
@@ -283,13 +234,6 @@ public class POPartialAgg extends Physic
             firstTierThreshold = (int) (0.5 + totalTuples * (1f - (1f / sizeReduction)));
             secondTierThreshold = (int) (0.5 + totalTuples *  (1f / sizeReduction));
             LOG.info("Setting thresholds. Primary: " + firstTierThreshold + ". Secondary: " + secondTierThreshold);
-            // The second tier should at least allow one tuple before it tries to aggregate.
-            // This code retains the total number of tuples in the buffer while guaranteeing
-            // the second tier has at least one tuple.
-            if (secondTierThreshold == 0) {
-                secondTierThreshold += 1;
-                firstTierThreshold -= 1;
-            }
         }
         estimatedMemThresholds = true;
     }
@@ -299,9 +243,9 @@ public class POPartialAgg extends Physic
         aggregateFirstLevel();
         aggregateSecondLevel();
         int numAfterReduction = numRecsInProcessedMap + numRecsInRawMap;
-        LOG.debug("After reduction, processed map: " + numRecsInProcessedMap + "; raw map: " + numRecsInRawMap);
+        LOG.info("After reduction, processed map: " + numRecsInProcessedMap + "; raw map: " + numRecsInRawMap);
         int minReduction = getMinOutputReductionFromProp();
-        LOG.debug("Observed reduction factor: from " + numBeforeReduction +
+        LOG.info("Observed reduction factor: from " + numBeforeReduction +
                 " to " + numAfterReduction +
                 " => " + numBeforeReduction / numAfterReduction + ".");
         if ( numBeforeReduction / numAfterReduction < minReduction) {
@@ -322,15 +266,15 @@ public class POPartialAgg extends Physic
     }
 
     private boolean shouldAggregateFirstLevel() {
-        if (LOG.isDebugEnabled() && numRecsInRawMap > firstTierThreshold) {
-            LOG.debug("Aggregating " + numRecsInRawMap + " raw records.");
+        if (LOG.isInfoEnabled() && numRecsInRawMap > firstTierThreshold) {
+            LOG.info("Aggregating " + numRecsInRawMap + " raw records.");
         }
         return (numRecsInRawMap > firstTierThreshold);
     }
 
     private boolean shouldAggregateSecondLevel() {
-        if (LOG.isDebugEnabled() && numRecsInProcessedMap > secondTierThreshold) {
-            LOG.debug("Aggregating " + numRecsInProcessedMap + " secondary records.");
+        if (LOG.isInfoEnabled() && numRecsInProcessedMap > secondTierThreshold) {
+            LOG.info("Aggregating " + numRecsInProcessedMap + " secondary records.");
         }
         return (numRecsInProcessedMap > secondTierThreshold);
     }
@@ -366,21 +310,21 @@ public class POPartialAgg extends Physic
         if (spillingIterator != null) return;
 
         if (!rawInputMap.isEmpty()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("In startSpill(), aggregating raw inputs. " + numRecsInRawMap + " tuples.");
+            if (LOG.isInfoEnabled()) {
+                LOG.info("In startSpill(), aggregating raw inputs. " + numRecsInRawMap + " tuples.");
             }
             aggregateFirstLevel();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("processed inputs: " + numRecsInProcessedMap + " tuples.");
+            if (LOG.isInfoEnabled()) {
+                LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples.");
             }
         }
         if (!processedInputMap.isEmpty()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("In startSpill(), aggregating processed inputs. " + numRecsInProcessedMap + " tuples.");
+            if (LOG.isInfoEnabled()) {
+                LOG.info("In startSpill(), aggregating processed inputs. " + numRecsInProcessedMap + " tuples.");
             }
             aggregateSecondLevel();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("processed inputs: " + numRecsInProcessedMap + " tuples.");
+            if (LOG.isInfoEnabled()) {
+                LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples.");
             }
         }
         doSpill = true;
@@ -391,7 +335,7 @@ public class POPartialAgg extends Physic
         // if no more to spill, return EOP_RESULT.
         if (processedInputMap.isEmpty()) {
             spillingIterator = null;
-            LOG.debug("In spillResults(), processed map is empty -- done spilling.");
+            LOG.info("In spillResults(), processed map is empty -- done spilling.");
             return EOP_RESULT;
         } else {
             Map.Entry<Object, List<Tuple>> entry = spillingIterator.next();
@@ -592,7 +536,8 @@ public class POPartialAgg extends Physic
 
     @Override
     public long spill() {
-        doContingentSpill = true;
+        LOG.info("Spill triggered by SpillableMemoryManager");
+        doSpill = true;
         return 0;
     }
 

Modified: pig/branches/branch-0.14/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1632668&r1=1632667&r2=1632668&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/impl/util/SpillableMemoryManager.java Fri Oct 17 21:19:52 2014
@@ -50,9 +50,7 @@ public class SpillableMemoryManager impl
     
     private final Log log = LogFactory.getLog(getClass());
     
-    private LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
-    // String references to spillables
-    private LinkedList<SpillablePtr> spillablesSR = null;
+    LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
     
     // if we freed at least this much, invoke GC 
     // (default 40 MB - this can be overridden by user supplied property)
@@ -62,6 +60,10 @@ public class SpillableMemoryManager impl
     // (default 5MB - this can be overridden by user supplied property)
     private static long spillFileSizeThreshold = 5000000L ;
     
+    // this will keep track of memory freed across spills
+    // and between GC invocations
+    private static long accumulatedFreeSize = 0L;
+    
     // fraction of biggest heap for which we want to get
     // "memory usage threshold exceeded" notifications
     private static double memoryThresholdFraction = 0.7;
@@ -75,6 +77,11 @@ public class SpillableMemoryManager impl
     
     // log notification on collection threshold exceeded only the first time
     private boolean firstCollectionThreshExceededLogged = false;
+
+    // fraction of the total heap used for the threshold to determine
+    // if we want to perform an extra gc before the spill
+    private static double extraGCThresholdFraction = 0.05;
+    private static long extraGCSpillSizeThreshold  = 0L;
     
     private static volatile SpillableMemoryManager manager;
 
@@ -83,6 +90,7 @@ public class SpillableMemoryManager impl
         List<MemoryPoolMXBean> mpbeans = ManagementFactory.getMemoryPoolMXBeans();
         MemoryPoolMXBean biggestHeap = null;
         long biggestSize = 0;
+        long totalSize = 0;
         for (MemoryPoolMXBean b: mpbeans) {
             log.debug("Found heap (" + b.getName() +
                 ") of type " + b.getType());
@@ -91,12 +99,14 @@ public class SpillableMemoryManager impl
                  * heap is the tenured heap
                  */
                 long size = b.getUsage().getMax();
+                totalSize += size;
                 if (size > biggestSize) {
                     biggestSize = size;
                     biggestHeap = b;
                 }
             }
         }
+        extraGCSpillSizeThreshold  = (long) (totalSize * extraGCThresholdFraction);
         if (biggestHeap == null) {
             throw new RuntimeException("Couldn't find heap");
         }
@@ -179,7 +189,6 @@ public class SpillableMemoryManager impl
             }
 
         }
-        // Remove empty spillables to improve sort speed.
         clearSpillables();
         if (toFree < 0) {
             log.debug("low memory handler returning " + 
@@ -187,31 +196,16 @@ public class SpillableMemoryManager impl
             return;
         }
         synchronized(spillables) {
-            /**
-             * Store a reference to a spillable and its size into a stable
-             * list so that the sort is stable (a Java 7 contract).
-             * Between the time we sort and we use these spillables, they
-             * may actually change in size.
-             */
-            spillablesSR = new LinkedList<SpillablePtr>();
-            for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) {
-                // Check that the object still exists before adding to the Strong Referenced list.
-                Spillable s = i.next().get();
-                if (s == null) {
-                    i.remove();
-                    continue;
-                }
-                // Get a ptr to the spillable and its current size.
-                // we need a stable size for sorting.
-                spillablesSR.add(new SpillablePtr(s, s.getMemorySize()));
-            }
-            log.debug("Spillables list size: " + spillablesSR.size());
-            Collections.sort(spillablesSR, new Comparator<SpillablePtr>() {
-                // Sort the list in descending order. We spill the biggest items first,
-                // and only as many as we need to to reduce memory usage
-                // below the threshold.
+            Collections.sort(spillables, new Comparator<WeakReference<Spillable>>() {
+
+                /**
+                 * We don't lock anything, so this sort may not be stable if a WeakReference suddenly
+                 * becomes null, but it will be close enough.
+                 * Also between the time we sort and we use these spillables, they
+                 * may actually change in size - so this is just best effort
+                 */    
                 @Override
-                public int compare(SpillablePtr o1Ref, SpillablePtr o2Ref) {
+                public int compare(WeakReference<Spillable> o1Ref, WeakReference<Spillable> o2Ref) {
                     Spillable o1 = o1Ref.get();
                     Spillable o2 = o2Ref.get();
                     if (o1 == null && o2 == null) {
@@ -237,64 +231,77 @@ public class SpillableMemoryManager impl
             });
             long estimatedFreed = 0;
             int numObjSpilled = 0;
-            /*
-             * Before PIG-3979, Pig invoke System.gc inside this hook,
-             * but calling gc from within a gc notification seems to cause a lot of gc activity.
-             * More accurately, on Java implementations in which System.gc() does force a gc,
-             * this causes a lot of looping. On systems in which System.gc() is just a suggestion,
-             * relying on the outcome of the System.gc() call is a mistake.
-             *
-             * Therefore, this version of the code does away with attempts to guess what happens
-             * what we call s.spill(). For POPartionAgg spillables, the call tells the spillable
-             * to reduce itself. No data is necessarily written to disk. 
-             */
-            for (Iterator<SpillablePtr> i = spillablesSR.iterator(); i.hasNext();) {
-                SpillablePtr sPtr = i.next();
-                long toBeFreed = sPtr.getMemorySize();
+            boolean invokeGC = false;
+            boolean extraGCCalled = false;
+            for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) {
+                WeakReference<Spillable> weakRef = i.next();
+                Spillable s = weakRef.get();
+                // Still need to check for null here, even after we removed
+                // above, because the reference may have gone bad on us
+                // since the last check.
+                if (s == null) {
+                    i.remove();
+                    continue;
+                }
+                long toBeFreed = s.getMemorySize();
                 log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold = "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize);
                 // Don't keep trying if the rest of files are too small
                 if (toBeFreed < spillFileSizeThreshold) {
                     log.debug("spilling small files - getting out of memory handler");
                     break ;
                 }
-                Spillable s = sPtr.get();
-                if (s != null)
-                    s.spill();
+                // If single Spillable is bigger than the threshold,
+                // we force GC to make sure we really need to keep this
+                // object before paying for the expensive spill().
+                // Done at most once per handleNotification.
+                if( !extraGCCalled && extraGCSpillSizeThreshold != 0
+                    && toBeFreed > extraGCSpillSizeThreshold   ) {
+                    log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()");
+                    // this extra assignment to null is needed so that gc can free the
+                    // spillable if nothing else is pointing at it
+                    s = null;
+                    System.gc();
+                    extraGCCalled = true;
+                    // checking again to see if this reference is still valid
+                    s = weakRef.get();
+                    if (s == null) {
+                        i.remove();
+                        accumulatedFreeSize = 0;
+                        invokeGC = false;
+                        continue;
+                    }
+                }
+                s.spill();               
                 numObjSpilled++;
                 estimatedFreed += toBeFreed;
-
+                accumulatedFreeSize += toBeFreed;
+                // This should significantly reduce the number of small files
+                // in case that we have a lot of nested bags
+                if (accumulatedFreeSize > gcActivationSize) {
+                    invokeGC = true;
+                }
+                
                 if (estimatedFreed > toFree) {
                     log.debug("Freed enough space - getting out of memory handler");
+                    invokeGC = true;
                     break;
                 }
+            }           
+            /* Poke the GC again to see if we successfully freed enough memory */
+            if(invokeGC) {
+                System.gc();
+                // now that we have invoked the GC, reset accumulatedFreeSize
+                accumulatedFreeSize = 0;
             }
-            // We are done with the strongly referenced list of spillables
-            spillablesSR = null;
-
-            if (estimatedFreed > 0) {
+            if(estimatedFreed > 0){
                 String msg = "Spilled an estimate of " + estimatedFreed +
                 " bytes from " + numObjSpilled + " objects. " + info.getUsage();;
-                log.debug(msg);
+                log.info(msg);
             }
 
         }
     }
     
-    public static class SpillablePtr {
-        private WeakReference<Spillable> spillable;
-        private long size;
-        SpillablePtr(Spillable p, long s) {
-            spillable = new WeakReference<Spillable>(p);
-            size = s;
-        }
-        public Spillable get() {
-            return spillable.get();
-        }
-        public long getMemorySize() {
-            return size;
-        }
-    }
-
     public void clearSpillables() {
         synchronized (spillables) {
             // Walk the list first and remove nulls, otherwise the sort
@@ -315,7 +322,7 @@ public class SpillableMemoryManager impl
      */
     public void registerSpillable(Spillable s) {
         synchronized(spillables) {
-            // Cleaning the entire list is too expensive.  Just trim off the front while
+            // Cleaing the entire list is too expensive.  Just trim off the front while
             // we can.
             WeakReference<Spillable> first = spillables.peek();
             while (first != null && first.get() == null) {