You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/10/13 02:19:45 UTC

svn commit: r1631269 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java src/org/apache/pig/impl/util/SpillableMemoryManager.java

Author: daijy
Date: Mon Oct 13 00:19:44 2014
New Revision: 1631269

URL: http://svn.apache.org/r1631269
Log:
PIG-3979: group all performance, garbage collection, and incremental aggregation

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
    pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1631269&r1=1631268&r2=1631269&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Oct 13 00:19:44 2014
@@ -94,6 +94,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-3979: group all performance, garbage collection, and incremental aggregation (ddreyfus via daijy)
+
 PIG-4199: Mapreduce ACLs should be translated to Tez ACLs (rohini)
 
 PIG-4227: Streaming Python UDF handles bag outputs incorrectly (cheolsoo)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1631269&r1=1631268&r2=1631269&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Mon Oct 13 00:19:44 2014
@@ -63,10 +63,6 @@ public class POPartialAgg extends Physic
     private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP,
             null);
 
-    // number of records to sample to determine average size used by each
-    // entry in hash map and average seen reduction
-    private static final int NUM_RECS_TO_SAMPLE = 10000;
-
     // We want to avoid massive ArrayList copies as they get big.
     // Array Lists grow by prevSize + prevSize/2. Given default initial size of 10,
     // 9369 is the size of the array after 18 such resizings. This seems like a sufficiently
@@ -100,7 +96,15 @@ public class POPartialAgg extends Physic
     private boolean disableMapAgg = false;
     private boolean sizeReductionChecked = false;
     private boolean inputsExhausted = false;
+    // The doSpill flag is set when spilling is running or needs to run.
+    // It is set by POPartialAgg when its buffers are full after having run aggregations.
+    // The doContingentSpill flag is set when the SpillableMemoryManager is notified
+    // by GC that the runtime is low on memory and the SpillableMemoryManager identifies
+    // the particular buffer as a good spill candidate because it is large. The contingent spill logic tries
+    // to satisfy the memory manager's request for freeing memory by aggregating data
+    // rather than just spilling records to disk. 
     private volatile boolean doSpill = false;
+    private volatile boolean doContingentSpill = false;
     private transient MemoryLimits memLimits;
 
     private transient boolean initialized = false;
@@ -110,6 +114,8 @@ public class POPartialAgg extends Physic
     private int avgTupleSize = 0;
     private Iterator<Entry<Object, List<Tuple>>> spillingIterator;
     private boolean estimatedMemThresholds = false;
+    private long sampleMem;
+    private long sampleSize = 0;
 
 
     public POPartialAgg(OperatorKey k) {
@@ -124,6 +130,9 @@ public class POPartialAgg extends Physic
             disableMapAgg();
         }
         initialized = true;
+        sampleMem = (long) (Runtime.getRuntime().maxMemory() * percent);
+        sampleSize = 0;
+
         SpillableMemoryManager.getInstance().registerSpillable(this);
     }
 
@@ -145,12 +154,39 @@ public class POPartialAgg extends Physic
         }
 
         while (true) {
-            if (!sizeReductionChecked && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
+            if (!sizeReductionChecked && sampleSize >= sampleMem /*
+                                                                  * numRecsInRawMap
+                                                                  * >=
+                                                                  * NUM_RECS_TO_SAMPLE
+                                                                  */) {
                 checkSizeReduction();
+                sampleSize = 0;
             }
-            if (!estimatedMemThresholds && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
+            if (!estimatedMemThresholds && sampleSize >= sampleMem /*
+                                                                    * numRecsInRawMap
+                                                                    * >=
+                                                                    * NUM_RECS_TO_SAMPLE
+                                                                    */) {
                 estimateMemThresholds();
             }
+            if (doContingentSpill) {
+                doContingentSpill = false;
+                // Don't aggregate if spilling. Avoid concurrent update of spilling iterator.
+                if (doSpill == false) {
+                    // SpillableMemoryManager requested a spill to reduce memory
+                    // consumption.
+                    // See if we can avoid it.
+                    aggregateFirstLevel();
+                    aggregateSecondLevel();
+                    if (!shouldSpill()) {
+                        LOG.debug("Spill triggered by SpillableMemoryManager suppressed");
+                    } else {
+                        LOG.debug("Spill triggered by SpillableMemoryManager");
+                        doSpill = true;
+                    }
+                }
+
+            }
             if (doSpill) {
                 startSpill();
                 Result result = spillResult();
@@ -175,7 +211,7 @@ public class POPartialAgg extends Physic
                         // parent input is over. flush what we have.
                         inputsExhausted = true;
                         startSpill();
-                        LOG.info("Spilling last bits.");
+                        LOG.debug("Spilling last bits.");
                         continue;
                     } else {
                         return EOP_RESULT;
@@ -194,6 +230,19 @@ public class POPartialAgg extends Physic
                     }
                     Object key = keyRes.result;
                     keyPlan.detachInput();
+                    if (numRecsInRawMap == 0) {
+                        sampleSize = 0;
+                    }
+                    // Collecting the tuple memory size is surprisingly
+                    // expensive.
+                    // don't do it after we have the memory threshold since it's
+                    // no longer used.
+                    if (!estimatedMemThresholds) {
+                        int tupleSize = (int) inpTuple.getMemorySize();
+                        sampleSize += tupleSize;
+                        if (avgTupleSize == 0)
+                            avgTupleSize = tupleSize;
+                    }
                     numRecsInRawMap += 1;
                     addKeyValToMap(rawInputMap, key, inpTuple);
 
@@ -234,6 +283,13 @@ public class POPartialAgg extends Physic
             firstTierThreshold = (int) (0.5 + totalTuples * (1f - (1f / sizeReduction)));
             secondTierThreshold = (int) (0.5 + totalTuples *  (1f / sizeReduction));
             LOG.info("Setting thresholds. Primary: " + firstTierThreshold + ". Secondary: " + secondTierThreshold);
+            // The second tier should at least allow one tuple before it tries to aggregate.
+            // This code retains the total number of tuples in the buffer while guaranteeing
+            // the second tier has at least one tuple.
+            if (secondTierThreshold == 0) {
+                secondTierThreshold += 1;
+                firstTierThreshold -= 1;
+            }
         }
         estimatedMemThresholds = true;
     }
@@ -243,9 +299,9 @@ public class POPartialAgg extends Physic
         aggregateFirstLevel();
         aggregateSecondLevel();
         int numAfterReduction = numRecsInProcessedMap + numRecsInRawMap;
-        LOG.info("After reduction, processed map: " + numRecsInProcessedMap + "; raw map: " + numRecsInRawMap);
+        LOG.debug("After reduction, processed map: " + numRecsInProcessedMap + "; raw map: " + numRecsInRawMap);
         int minReduction = getMinOutputReductionFromProp();
-        LOG.info("Observed reduction factor: from " + numBeforeReduction +
+        LOG.debug("Observed reduction factor: from " + numBeforeReduction +
                 " to " + numAfterReduction +
                 " => " + numBeforeReduction / numAfterReduction + ".");
         if ( numBeforeReduction / numAfterReduction < minReduction) {
@@ -266,15 +322,15 @@ public class POPartialAgg extends Physic
     }
 
     private boolean shouldAggregateFirstLevel() {
-        if (LOG.isInfoEnabled() && numRecsInRawMap > firstTierThreshold) {
-            LOG.info("Aggregating " + numRecsInRawMap + " raw records.");
+        if (LOG.isDebugEnabled() && numRecsInRawMap > firstTierThreshold) {
+            LOG.debug("Aggregating " + numRecsInRawMap + " raw records.");
         }
         return (numRecsInRawMap > firstTierThreshold);
     }
 
     private boolean shouldAggregateSecondLevel() {
-        if (LOG.isInfoEnabled() && numRecsInProcessedMap > secondTierThreshold) {
-            LOG.info("Aggregating " + numRecsInProcessedMap + " secondary records.");
+        if (LOG.isDebugEnabled() && numRecsInProcessedMap > secondTierThreshold) {
+            LOG.debug("Aggregating " + numRecsInProcessedMap + " secondary records.");
         }
         return (numRecsInProcessedMap > secondTierThreshold);
     }
@@ -310,21 +366,21 @@ public class POPartialAgg extends Physic
         if (spillingIterator != null) return;
 
         if (!rawInputMap.isEmpty()) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("In startSpill(), aggregating raw inputs. " + numRecsInRawMap + " tuples.");
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("In startSpill(), aggregating raw inputs. " + numRecsInRawMap + " tuples.");
             }
             aggregateFirstLevel();
-            if (LOG.isInfoEnabled()) {
-                LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples.");
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("processed inputs: " + numRecsInProcessedMap + " tuples.");
             }
         }
         if (!processedInputMap.isEmpty()) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("In startSpill(), aggregating processed inputs. " + numRecsInProcessedMap + " tuples.");
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("In startSpill(), aggregating processed inputs. " + numRecsInProcessedMap + " tuples.");
             }
             aggregateSecondLevel();
-            if (LOG.isInfoEnabled()) {
-                LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples.");
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("processed inputs: " + numRecsInProcessedMap + " tuples.");
             }
         }
         doSpill = true;
@@ -335,7 +391,7 @@ public class POPartialAgg extends Physic
         // if no more to spill, return EOP_RESULT.
         if (processedInputMap.isEmpty()) {
             spillingIterator = null;
-            LOG.info("In spillResults(), processed map is empty -- done spilling.");
+            LOG.debug("In spillResults(), processed map is empty -- done spilling.");
             return EOP_RESULT;
         } else {
             Map.Entry<Object, List<Tuple>> entry = spillingIterator.next();
@@ -536,8 +592,7 @@ public class POPartialAgg extends Physic
 
     @Override
     public long spill() {
-        LOG.info("Spill triggered by SpillableMemoryManager");
-        doSpill = true;
+        doContingentSpill = true;
         return 0;
     }
 

Modified: pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1631269&r1=1631268&r2=1631269&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java Mon Oct 13 00:19:44 2014
@@ -50,7 +50,9 @@ public class SpillableMemoryManager impl
     
     private final Log log = LogFactory.getLog(getClass());
     
-    LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
+    private LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
+    // String references to spillables
+    private LinkedList<SpillablePtr> spillablesSR = null;
     
     // if we freed at least this much, invoke GC 
     // (default 40 MB - this can be overridden by user supplied property)
@@ -60,10 +62,6 @@ public class SpillableMemoryManager impl
     // (default 5MB - this can be overridden by user supplied property)
     private static long spillFileSizeThreshold = 5000000L ;
     
-    // this will keep track of memory freed across spills
-    // and between GC invocations
-    private static long accumulatedFreeSize = 0L;
-    
     // fraction of biggest heap for which we want to get
     // "memory usage threshold exceeded" notifications
     private static double memoryThresholdFraction = 0.7;
@@ -77,11 +75,6 @@ public class SpillableMemoryManager impl
     
     // log notification on collection threshold exceeded only the first time
     private boolean firstCollectionThreshExceededLogged = false;
-
-    // fraction of the total heap used for the threshold to determine
-    // if we want to perform an extra gc before the spill
-    private static double extraGCThresholdFraction = 0.05;
-    private static long extraGCSpillSizeThreshold  = 0L;
     
     private static volatile SpillableMemoryManager manager;
 
@@ -90,7 +83,6 @@ public class SpillableMemoryManager impl
         List<MemoryPoolMXBean> mpbeans = ManagementFactory.getMemoryPoolMXBeans();
         MemoryPoolMXBean biggestHeap = null;
         long biggestSize = 0;
-        long totalSize = 0;
         for (MemoryPoolMXBean b: mpbeans) {
             log.debug("Found heap (" + b.getName() +
                 ") of type " + b.getType());
@@ -99,14 +91,12 @@ public class SpillableMemoryManager impl
                  * heap is the tenured heap
                  */
                 long size = b.getUsage().getMax();
-                totalSize += size;
                 if (size > biggestSize) {
                     biggestSize = size;
                     biggestHeap = b;
                 }
             }
         }
-        extraGCSpillSizeThreshold  = (long) (totalSize * extraGCThresholdFraction);
         if (biggestHeap == null) {
             throw new RuntimeException("Couldn't find heap");
         }
@@ -189,6 +179,7 @@ public class SpillableMemoryManager impl
             }
 
         }
+        // Remove empty spillables to improve sort speed.
         clearSpillables();
         if (toFree < 0) {
             log.debug("low memory handler returning " + 
@@ -196,16 +187,31 @@ public class SpillableMemoryManager impl
             return;
         }
         synchronized(spillables) {
-            Collections.sort(spillables, new Comparator<WeakReference<Spillable>>() {
-
-                /**
-                 * We don't lock anything, so this sort may not be stable if a WeakReference suddenly
-                 * becomes null, but it will be close enough.
-                 * Also between the time we sort and we use these spillables, they
-                 * may actually change in size - so this is just best effort
-                 */    
+            /**
+             * Store a reference to a spillable and its size into a stable
+             * list so that the sort is stable (a Java 7 contract).
+             * Between the time we sort and we use these spillables, they
+             * may actually change in size.
+             */
+            spillablesSR = new LinkedList<SpillablePtr>();
+            for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) {
+                // Check that the object still exists before adding to the Strong Referenced list.
+                Spillable s = i.next().get();
+                if (s == null) {
+                    i.remove();
+                    continue;
+                }
+                // Get a ptr to the spillable and its current size.
+                // we need a stable size for sorting.
+                spillablesSR.add(new SpillablePtr(s, s.getMemorySize()));
+            }
+            log.debug("Spillables list size: " + spillablesSR.size());
+            Collections.sort(spillablesSR, new Comparator<SpillablePtr>() {
+                // Sort the list in descending order. We spill the biggest items first,
+                // and only as many as we need to to reduce memory usage
+                // below the threshold.
                 @Override
-                public int compare(WeakReference<Spillable> o1Ref, WeakReference<Spillable> o2Ref) {
+                public int compare(SpillablePtr o1Ref, SpillablePtr o2Ref) {
                     Spillable o1 = o1Ref.get();
                     Spillable o2 = o2Ref.get();
                     if (o1 == null && o2 == null) {
@@ -231,77 +237,64 @@ public class SpillableMemoryManager impl
             });
             long estimatedFreed = 0;
             int numObjSpilled = 0;
-            boolean invokeGC = false;
-            boolean extraGCCalled = false;
-            for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) {
-                WeakReference<Spillable> weakRef = i.next();
-                Spillable s = weakRef.get();
-                // Still need to check for null here, even after we removed
-                // above, because the reference may have gone bad on us
-                // since the last check.
-                if (s == null) {
-                    i.remove();
-                    continue;
-                }
-                long toBeFreed = s.getMemorySize();
+            /*
+             * Before PIG-3979, Pig invoke System.gc inside this hook,
+             * but calling gc from within a gc notification seems to cause a lot of gc activity.
+             * More accurately, on Java implementations in which System.gc() does force a gc,
+             * this causes a lot of looping. On systems in which System.gc() is just a suggestion,
+             * relying on the outcome of the System.gc() call is a mistake.
+             *
+             * Therefore, this version of the code does away with attempts to guess what happens
+             * what we call s.spill(). For POPartionAgg spillables, the call tells the spillable
+             * to reduce itself. No data is necessarily written to disk. 
+             */
+            for (Iterator<SpillablePtr> i = spillablesSR.iterator(); i.hasNext();) {
+                SpillablePtr sPtr = i.next();
+                long toBeFreed = sPtr.getMemorySize();
                 log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold = "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize);
                 // Don't keep trying if the rest of files are too small
                 if (toBeFreed < spillFileSizeThreshold) {
                     log.debug("spilling small files - getting out of memory handler");
                     break ;
                 }
-                // If single Spillable is bigger than the threshold,
-                // we force GC to make sure we really need to keep this
-                // object before paying for the expensive spill().
-                // Done at most once per handleNotification.
-                if( !extraGCCalled && extraGCSpillSizeThreshold != 0
-                    && toBeFreed > extraGCSpillSizeThreshold   ) {
-                    log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()");
-                    // this extra assignment to null is needed so that gc can free the
-                    // spillable if nothing else is pointing at it
-                    s = null;
-                    System.gc();
-                    extraGCCalled = true;
-                    // checking again to see if this reference is still valid
-                    s = weakRef.get();
-                    if (s == null) {
-                        i.remove();
-                        accumulatedFreeSize = 0;
-                        invokeGC = false;
-                        continue;
-                    }
-                }
-                s.spill();               
+                Spillable s = sPtr.get();
+                if (s != null)
+                    s.spill();
                 numObjSpilled++;
                 estimatedFreed += toBeFreed;
-                accumulatedFreeSize += toBeFreed;
-                // This should significantly reduce the number of small files
-                // in case that we have a lot of nested bags
-                if (accumulatedFreeSize > gcActivationSize) {
-                    invokeGC = true;
-                }
-                
+
                 if (estimatedFreed > toFree) {
                     log.debug("Freed enough space - getting out of memory handler");
-                    invokeGC = true;
                     break;
                 }
-            }           
-            /* Poke the GC again to see if we successfully freed enough memory */
-            if(invokeGC) {
-                System.gc();
-                // now that we have invoked the GC, reset accumulatedFreeSize
-                accumulatedFreeSize = 0;
             }
-            if(estimatedFreed > 0){
+            // We are done with the strongly referenced list of spillables
+            spillablesSR = null;
+
+            if (estimatedFreed > 0) {
                 String msg = "Spilled an estimate of " + estimatedFreed +
                 " bytes from " + numObjSpilled + " objects. " + info.getUsage();;
-                log.info(msg);
+                log.debug(msg);
             }
 
         }
     }
     
+    public static class SpillablePtr {
+        private WeakReference<Spillable> spillable;
+        private long size;
+        SpillablePtr(Spillable p, long s) {
+            spillable = new WeakReference<Spillable>(p);
+            size = s;
+        }
+        public Spillable get() {
+            return spillable.get();
+        }
+        public long getMemorySize() {
+            return size;
+        }
+    }
+
     public void clearSpillables() {
         synchronized (spillables) {
             // Walk the list first and remove nulls, otherwise the sort
@@ -322,7 +315,7 @@ public class SpillableMemoryManager impl
      */
     public void registerSpillable(Spillable s) {
         synchronized(spillables) {
-            // Cleaing the entire list is too expensive.  Just trim off the front while
+            // Cleaning the entire list is too expensive.  Just trim off the front while
             // we can.
             WeakReference<Spillable> first = spillables.peek();
             while (first != null && first.get() == null) {