You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/01/17 21:48:48 UTC

svn commit: r1725124 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java src/org/apache/pig/impl/util/SpillableMemoryManager.java

Author: rohini
Date: Sun Jan 17 20:48:48 2016
New Revision: 1725124

URL: http://svn.apache.org/viewvc?rev=1725124&view=rev
Log:
PIG-4782: OutOfMemoryError: GC overhead limit exceeded with POPartialAgg (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
    pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1725124&r1=1725123&r2=1725124&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Jan 17 20:48:48 2016
@@ -81,6 +81,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4782: OutOfMemoryError: GC overhead limit exceeded with POPartialAgg (rohini)
+
 PIG-4737: Check and fix clone implementation for all classes extending PhysicalOperator (rohini)
 
 PIG-4770: OOM with POPartialAgg in some cases (rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1725124&r1=1725123&r2=1725124&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Sun Jan 17 20:48:48 2016
@@ -119,6 +119,7 @@ public class POPartialAgg extends Physic
     // rather than just spilling records to disk.
     private transient volatile boolean doSpill;
     private transient volatile boolean doContingentSpill;
+    private transient volatile boolean startedContingentSpill;
     private transient volatile Object spillLock;
 
     private transient int minOutputReduction;
@@ -218,6 +219,7 @@ public class POPartialAgg extends Physic
                 estimateMemThresholds();
             }
             if (doContingentSpill) {
+                startedContingentSpill = true;
                 // Don't aggregate if spilling. Avoid concurrent update of spilling iterator.
                 if (doSpill == false) {
                     // SpillableMemoryManager requested a spill to reduce memory
@@ -413,7 +415,7 @@ public class POPartialAgg extends Physic
 
         LOG.info("Starting spill.");
         if (aggregate) {
-            aggregateBothLevels(false, true);
+            aggregateBothLevels(false, false);
         }
         doSpill = true;
         spillingIterator = processedInputMap.entrySet().iterator();
@@ -638,20 +640,44 @@ public class POPartialAgg extends Physic
             return 0;
         } else {
             LOG.info("Spill triggered by SpillableMemoryManager");
-            doContingentSpill = true;
             synchronized(spillLock) {
-                if (!sizeReductionChecked) {
+                if (rawInputMap != null) {
+                    LOG.info("Memory usage: " + getMemorySize()
+                            + ". Raw map: num keys = " + rawInputMap.size()
+                            + ", num tuples = "+ numRecsInRawMap
+                            + ", Processed map: num keys = " + processedInputMap.size()
+                            + ", num tuples = "+ numRecsInProcessedMap );
+                }
+                startedContingentSpill = false;
+                doContingentSpill = true;
+                if (!sizeReductionChecked || !estimatedMemThresholds) {
                     numRecordsToSample = numRecsInRawMap;
                 }
                 try {
+                    // Block till spilling is finished. If main thread execution has not come to POPartialAgg
+                    // and is still processing lower pipeline for more than 5 seconds it means
+                    // jvm is stuck doing GC and will soon fail with java.lang.OutOfMemoryError: GC overhead limit exceeded
+                    // So exit out of here so that SpillableMemoryManger can at least spill
+                    // other Spillable bags and free up some memory for user code to be able to run
+                    // and reach POPartialAgg for the aggregation/spilling of the hashmaps to happen.
+                    long startTime = System.currentTimeMillis();
                     while (doContingentSpill == true) {
-                        Thread.sleep(50); //Keeping it on the lower side for now. Tune later
+                        Thread.sleep(25);
+                        if (!startedContingentSpill && (System.currentTimeMillis() - startTime) >= 5000) {
+                            break;
+                        }
+                    }
+                    if (doContingentSpill) {
+                        LOG.info("Not blocking for spill and letting SpillableMemoryManager"
+                                + " process other spillable objects as main thread has not reached here for 5 secs");
+                    } else {
+                        LOG.info("Finished spill for SpillableMemoryManager call");
+                        return 1;
                     }
                 } catch (InterruptedException e) {
                     LOG.warn("Interrupted exception while waiting for spill to finish", e);
                 }
-                LOG.info("Finished spill for SpillableMemoryManager call");
-                return 1;
+                return 0;
             }
         }
     }

Modified: pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1725124&r1=1725123&r2=1725124&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java Sun Jan 17 20:48:48 2016
@@ -285,15 +285,18 @@ public class SpillableMemoryManager impl
                     // Unblock registering of new bags temporarily as aggregation
                     // of POPartialAgg requires new record to be loaded.
                     blockRegisterOnSpill = !isGroupingSpillable;
+                    long numSpilled;
                     try {
-                        s.spill();
+                        numSpilled = s.spill();
                     } finally {
                         blockRegisterOnSpill = true;
                     }
 
-                    numObjSpilled++;
-                    estimatedFreed += toBeFreed;
-                    accumulatedFreeSize += toBeFreed;
+                    if (numSpilled > 0) {
+                        numObjSpilled++;
+                        estimatedFreed += toBeFreed;
+                        accumulatedFreeSize += toBeFreed;
+                    }
                     // This should significantly reduce the number of small files
                     // in case that we have a lot of nested bags
                     if (accumulatedFreeSize > gcActivationSize) {