You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/01/17 21:48:48 UTC
svn commit: r1725124 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
src/org/apache/pig/impl/util/SpillableMemoryManager.java
Author: rohini
Date: Sun Jan 17 20:48:48 2016
New Revision: 1725124
URL: http://svn.apache.org/viewvc?rev=1725124&view=rev
Log:
PIG-4782: OutOfMemoryError: GC overhead limit exceeded with POPartialAgg (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1725124&r1=1725123&r2=1725124&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Jan 17 20:48:48 2016
@@ -81,6 +81,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4782: OutOfMemoryError: GC overhead limit exceeded with POPartialAgg (rohini)
+
PIG-4737: Check and fix clone implementation for all classes extending PhysicalOperator (rohini)
PIG-4770: OOM with POPartialAgg in some cases (rohini)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1725124&r1=1725123&r2=1725124&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Sun Jan 17 20:48:48 2016
@@ -119,6 +119,7 @@ public class POPartialAgg extends Physic
// rather than just spilling records to disk.
private transient volatile boolean doSpill;
private transient volatile boolean doContingentSpill;
+ private transient volatile boolean startedContingentSpill;
private transient volatile Object spillLock;
private transient int minOutputReduction;
@@ -218,6 +219,7 @@ public class POPartialAgg extends Physic
estimateMemThresholds();
}
if (doContingentSpill) {
+ startedContingentSpill = true;
// Don't aggregate if spilling. Avoid concurrent update of spilling iterator.
if (doSpill == false) {
// SpillableMemoryManager requested a spill to reduce memory
@@ -413,7 +415,7 @@ public class POPartialAgg extends Physic
LOG.info("Starting spill.");
if (aggregate) {
- aggregateBothLevels(false, true);
+ aggregateBothLevels(false, false);
}
doSpill = true;
spillingIterator = processedInputMap.entrySet().iterator();
@@ -638,20 +640,44 @@ public class POPartialAgg extends Physic
return 0;
} else {
LOG.info("Spill triggered by SpillableMemoryManager");
- doContingentSpill = true;
synchronized(spillLock) {
- if (!sizeReductionChecked) {
+ if (rawInputMap != null) {
+ LOG.info("Memory usage: " + getMemorySize()
+ + ". Raw map: num keys = " + rawInputMap.size()
+ + ", num tuples = "+ numRecsInRawMap
+ + ", Processed map: num keys = " + processedInputMap.size()
+ + ", num tuples = "+ numRecsInProcessedMap );
+ }
+ startedContingentSpill = false;
+ doContingentSpill = true;
+ if (!sizeReductionChecked || !estimatedMemThresholds) {
numRecordsToSample = numRecsInRawMap;
}
try {
+ // Block till spilling is finished. If main thread execution has not come to POPartialAgg
+ // and is still processing lower pipeline for more than 5 seconds it means
+ // jvm is stuck doing GC and will soon fail with java.lang.OutOfMemoryError: GC overhead limit exceeded
+ // So exit out of here so that SpillableMemoryManger can at least spill
+ // other Spillable bags and free up some memory for user code to be able to run
+ // and reach POPartialAgg for the aggregation/spilling of the hashmaps to happen.
+ long startTime = System.currentTimeMillis();
while (doContingentSpill == true) {
- Thread.sleep(50); //Keeping it on the lower side for now. Tune later
+ Thread.sleep(25);
+ if (!startedContingentSpill && (System.currentTimeMillis() - startTime) >= 5000) {
+ break;
+ }
+ }
+ if (doContingentSpill) {
+ LOG.info("Not blocking for spill and letting SpillableMemoryManager"
+ + " process other spillable objects as main thread has not reached here for 5 secs");
+ } else {
+ LOG.info("Finished spill for SpillableMemoryManager call");
+ return 1;
}
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting for spill to finish", e);
}
- LOG.info("Finished spill for SpillableMemoryManager call");
- return 1;
+ return 0;
}
}
}
Modified: pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1725124&r1=1725123&r2=1725124&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java Sun Jan 17 20:48:48 2016
@@ -285,15 +285,18 @@ public class SpillableMemoryManager impl
// Unblock registering of new bags temporarily as aggregation
// of POPartialAgg requires new record to be loaded.
blockRegisterOnSpill = !isGroupingSpillable;
+ long numSpilled;
try {
- s.spill();
+ numSpilled = s.spill();
} finally {
blockRegisterOnSpill = true;
}
- numObjSpilled++;
- estimatedFreed += toBeFreed;
- accumulatedFreeSize += toBeFreed;
+ if (numSpilled > 0) {
+ numObjSpilled++;
+ estimatedFreed += toBeFreed;
+ accumulatedFreeSize += toBeFreed;
+ }
// This should significantly reduce the number of small files
// in case that we have a lot of nested bags
if (accumulatedFreeSize > gcActivationSize) {