You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/10/17 23:19:52 UTC
svn commit: r1632668 - in /pig/branches/branch-0.14: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
src/org/apache/pig/impl/util/SpillableMemoryManager.java
Author: daijy
Date: Fri Oct 17 21:19:52 2014
New Revision: 1632668
URL: http://svn.apache.org/r1632668
Log:
Rollback PIG-3979: group all performance, garbage collection, and incremental aggregation
Modified:
pig/branches/branch-0.14/CHANGES.txt
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
pig/branches/branch-0.14/src/org/apache/pig/impl/util/SpillableMemoryManager.java
Modified: pig/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/CHANGES.txt?rev=1632668&r1=1632667&r2=1632668&view=diff
==============================================================================
--- pig/branches/branch-0.14/CHANGES.txt (original)
+++ pig/branches/branch-0.14/CHANGES.txt Fri Oct 17 21:19:52 2014
@@ -92,8 +92,6 @@ PIG-2834: MultiStorage requires unused c
PIG-4230: Documentation fix: first nested foreach example is incomplete (lbendig via daijy)
-PIG-3979: group all performance, garbage collection, and incremental aggregation (ddreyfus via daijy)
-
PIG-4199: Mapreduce ACLs should be translated to Tez ACLs (rohini)
PIG-4227: Streaming Python UDF handles bag outputs incorrectly (cheolsoo)
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1632668&r1=1632667&r2=1632668&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Fri Oct 17 21:19:52 2014
@@ -63,6 +63,10 @@ public class POPartialAgg extends Physic
private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP,
null);
+ // number of records to sample to determine average size used by each
+ // entry in hash map and average seen reduction
+ private static final int NUM_RECS_TO_SAMPLE = 10000;
+
// We want to avoid massive ArrayList copies as they get big.
// Array Lists grow by prevSize + prevSize/2. Given default initial size of 10,
// 9369 is the size of the array after 18 such resizings. This seems like a sufficiently
@@ -96,15 +100,7 @@ public class POPartialAgg extends Physic
private boolean disableMapAgg = false;
private boolean sizeReductionChecked = false;
private boolean inputsExhausted = false;
- // The doSpill flag is set when spilling is running or needs to run.
- // It is set by POPartialAgg when its buffers are full after having run aggregations.
- // The doContingentSpill flag is set when the SpillableMemoryManager is notified
- // by GC that the runtime is low on memory and the SpillableMemoryManager identifies
- // the particular buffer as a good spill candidate because it is large. The contingent spill logic tries
- // to satisfy the memory manager's request for freeing memory by aggregating data
- // rather than just spilling records to disk.
private volatile boolean doSpill = false;
- private volatile boolean doContingentSpill = false;
private transient MemoryLimits memLimits;
private transient boolean initialized = false;
@@ -114,8 +110,6 @@ public class POPartialAgg extends Physic
private int avgTupleSize = 0;
private Iterator<Entry<Object, List<Tuple>>> spillingIterator;
private boolean estimatedMemThresholds = false;
- private long sampleMem;
- private long sampleSize = 0;
public POPartialAgg(OperatorKey k) {
@@ -130,9 +124,6 @@ public class POPartialAgg extends Physic
disableMapAgg();
}
initialized = true;
- sampleMem = (long) (Runtime.getRuntime().maxMemory() * percent);
- sampleSize = 0;
-
SpillableMemoryManager.getInstance().registerSpillable(this);
}
@@ -154,39 +145,12 @@ public class POPartialAgg extends Physic
}
while (true) {
- if (!sizeReductionChecked && sampleSize >= sampleMem /*
- * numRecsInRawMap
- * >=
- * NUM_RECS_TO_SAMPLE
- */) {
+ if (!sizeReductionChecked && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
checkSizeReduction();
- sampleSize = 0;
}
- if (!estimatedMemThresholds && sampleSize >= sampleMem /*
- * numRecsInRawMap
- * >=
- * NUM_RECS_TO_SAMPLE
- */) {
+ if (!estimatedMemThresholds && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
estimateMemThresholds();
}
- if (doContingentSpill) {
- doContingentSpill = false;
- // Don't aggregate if spilling. Avoid concurrent update of spilling iterator.
- if (doSpill == false) {
- // SpillableMemoryManager requested a spill to reduce memory
- // consumption.
- // See if we can avoid it.
- aggregateFirstLevel();
- aggregateSecondLevel();
- if (!shouldSpill()) {
- LOG.debug("Spill triggered by SpillableMemoryManager suppressed");
- } else {
- LOG.debug("Spill triggered by SpillableMemoryManager");
- doSpill = true;
- }
- }
-
- }
if (doSpill) {
startSpill();
Result result = spillResult();
@@ -211,7 +175,7 @@ public class POPartialAgg extends Physic
// parent input is over. flush what we have.
inputsExhausted = true;
startSpill();
- LOG.debug("Spilling last bits.");
+ LOG.info("Spilling last bits.");
continue;
} else {
return EOP_RESULT;
@@ -230,19 +194,6 @@ public class POPartialAgg extends Physic
}
Object key = keyRes.result;
keyPlan.detachInput();
- if (numRecsInRawMap == 0) {
- sampleSize = 0;
- }
- // Collecting the tuple memory size is surprisingly
- // expensive.
- // don't do it after we have the memory threshold since it's
- // no longer used.
- if (!estimatedMemThresholds) {
- int tupleSize = (int) inpTuple.getMemorySize();
- sampleSize += tupleSize;
- if (avgTupleSize == 0)
- avgTupleSize = tupleSize;
- }
numRecsInRawMap += 1;
addKeyValToMap(rawInputMap, key, inpTuple);
@@ -283,13 +234,6 @@ public class POPartialAgg extends Physic
firstTierThreshold = (int) (0.5 + totalTuples * (1f - (1f / sizeReduction)));
secondTierThreshold = (int) (0.5 + totalTuples * (1f / sizeReduction));
LOG.info("Setting thresholds. Primary: " + firstTierThreshold + ". Secondary: " + secondTierThreshold);
- // The second tier should at least allow one tuple before it tries to aggregate.
- // This code retains the total number of tuples in the buffer while guaranteeing
- // the second tier has at least one tuple.
- if (secondTierThreshold == 0) {
- secondTierThreshold += 1;
- firstTierThreshold -= 1;
- }
}
estimatedMemThresholds = true;
}
@@ -299,9 +243,9 @@ public class POPartialAgg extends Physic
aggregateFirstLevel();
aggregateSecondLevel();
int numAfterReduction = numRecsInProcessedMap + numRecsInRawMap;
- LOG.debug("After reduction, processed map: " + numRecsInProcessedMap + "; raw map: " + numRecsInRawMap);
+ LOG.info("After reduction, processed map: " + numRecsInProcessedMap + "; raw map: " + numRecsInRawMap);
int minReduction = getMinOutputReductionFromProp();
- LOG.debug("Observed reduction factor: from " + numBeforeReduction +
+ LOG.info("Observed reduction factor: from " + numBeforeReduction +
" to " + numAfterReduction +
" => " + numBeforeReduction / numAfterReduction + ".");
if ( numBeforeReduction / numAfterReduction < minReduction) {
@@ -322,15 +266,15 @@ public class POPartialAgg extends Physic
}
private boolean shouldAggregateFirstLevel() {
- if (LOG.isDebugEnabled() && numRecsInRawMap > firstTierThreshold) {
- LOG.debug("Aggregating " + numRecsInRawMap + " raw records.");
+ if (LOG.isInfoEnabled() && numRecsInRawMap > firstTierThreshold) {
+ LOG.info("Aggregating " + numRecsInRawMap + " raw records.");
}
return (numRecsInRawMap > firstTierThreshold);
}
private boolean shouldAggregateSecondLevel() {
- if (LOG.isDebugEnabled() && numRecsInProcessedMap > secondTierThreshold) {
- LOG.debug("Aggregating " + numRecsInProcessedMap + " secondary records.");
+ if (LOG.isInfoEnabled() && numRecsInProcessedMap > secondTierThreshold) {
+ LOG.info("Aggregating " + numRecsInProcessedMap + " secondary records.");
}
return (numRecsInProcessedMap > secondTierThreshold);
}
@@ -366,21 +310,21 @@ public class POPartialAgg extends Physic
if (spillingIterator != null) return;
if (!rawInputMap.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("In startSpill(), aggregating raw inputs. " + numRecsInRawMap + " tuples.");
+ if (LOG.isInfoEnabled()) {
+ LOG.info("In startSpill(), aggregating raw inputs. " + numRecsInRawMap + " tuples.");
}
aggregateFirstLevel();
- if (LOG.isDebugEnabled()) {
- LOG.debug("processed inputs: " + numRecsInProcessedMap + " tuples.");
+ if (LOG.isInfoEnabled()) {
+ LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples.");
}
}
if (!processedInputMap.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("In startSpill(), aggregating processed inputs. " + numRecsInProcessedMap + " tuples.");
+ if (LOG.isInfoEnabled()) {
+ LOG.info("In startSpill(), aggregating processed inputs. " + numRecsInProcessedMap + " tuples.");
}
aggregateSecondLevel();
- if (LOG.isDebugEnabled()) {
- LOG.debug("processed inputs: " + numRecsInProcessedMap + " tuples.");
+ if (LOG.isInfoEnabled()) {
+ LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples.");
}
}
doSpill = true;
@@ -391,7 +335,7 @@ public class POPartialAgg extends Physic
// if no more to spill, return EOP_RESULT.
if (processedInputMap.isEmpty()) {
spillingIterator = null;
- LOG.debug("In spillResults(), processed map is empty -- done spilling.");
+ LOG.info("In spillResults(), processed map is empty -- done spilling.");
return EOP_RESULT;
} else {
Map.Entry<Object, List<Tuple>> entry = spillingIterator.next();
@@ -592,7 +536,8 @@ public class POPartialAgg extends Physic
@Override
public long spill() {
- doContingentSpill = true;
+ LOG.info("Spill triggered by SpillableMemoryManager");
+ doSpill = true;
return 0;
}
Modified: pig/branches/branch-0.14/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1632668&r1=1632667&r2=1632668&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/impl/util/SpillableMemoryManager.java Fri Oct 17 21:19:52 2014
@@ -50,9 +50,7 @@ public class SpillableMemoryManager impl
private final Log log = LogFactory.getLog(getClass());
- private LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
- // String references to spillables
- private LinkedList<SpillablePtr> spillablesSR = null;
+ LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
// if we freed at least this much, invoke GC
// (default 40 MB - this can be overridden by user supplied property)
@@ -62,6 +60,10 @@ public class SpillableMemoryManager impl
// (default 5MB - this can be overridden by user supplied property)
private static long spillFileSizeThreshold = 5000000L ;
+ // this will keep track of memory freed across spills
+ // and between GC invocations
+ private static long accumulatedFreeSize = 0L;
+
// fraction of biggest heap for which we want to get
// "memory usage threshold exceeded" notifications
private static double memoryThresholdFraction = 0.7;
@@ -75,6 +77,11 @@ public class SpillableMemoryManager impl
// log notification on collection threshold exceeded only the first time
private boolean firstCollectionThreshExceededLogged = false;
+
+ // fraction of the total heap used for the threshold to determine
+ // if we want to perform an extra gc before the spill
+ private static double extraGCThresholdFraction = 0.05;
+ private static long extraGCSpillSizeThreshold = 0L;
private static volatile SpillableMemoryManager manager;
@@ -83,6 +90,7 @@ public class SpillableMemoryManager impl
List<MemoryPoolMXBean> mpbeans = ManagementFactory.getMemoryPoolMXBeans();
MemoryPoolMXBean biggestHeap = null;
long biggestSize = 0;
+ long totalSize = 0;
for (MemoryPoolMXBean b: mpbeans) {
log.debug("Found heap (" + b.getName() +
") of type " + b.getType());
@@ -91,12 +99,14 @@ public class SpillableMemoryManager impl
* heap is the tenured heap
*/
long size = b.getUsage().getMax();
+ totalSize += size;
if (size > biggestSize) {
biggestSize = size;
biggestHeap = b;
}
}
}
+ extraGCSpillSizeThreshold = (long) (totalSize * extraGCThresholdFraction);
if (biggestHeap == null) {
throw new RuntimeException("Couldn't find heap");
}
@@ -179,7 +189,6 @@ public class SpillableMemoryManager impl
}
}
- // Remove empty spillables to improve sort speed.
clearSpillables();
if (toFree < 0) {
log.debug("low memory handler returning " +
@@ -187,31 +196,16 @@ public class SpillableMemoryManager impl
return;
}
synchronized(spillables) {
- /**
- * Store a reference to a spillable and its size into a stable
- * list so that the sort is stable (a Java 7 contract).
- * Between the time we sort and we use these spillables, they
- * may actually change in size.
- */
- spillablesSR = new LinkedList<SpillablePtr>();
- for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) {
- // Check that the object still exists before adding to the Strong Referenced list.
- Spillable s = i.next().get();
- if (s == null) {
- i.remove();
- continue;
- }
- // Get a ptr to the spillable and its current size.
- // we need a stable size for sorting.
- spillablesSR.add(new SpillablePtr(s, s.getMemorySize()));
- }
- log.debug("Spillables list size: " + spillablesSR.size());
- Collections.sort(spillablesSR, new Comparator<SpillablePtr>() {
- // Sort the list in descending order. We spill the biggest items first,
- // and only as many as we need to to reduce memory usage
- // below the threshold.
+ Collections.sort(spillables, new Comparator<WeakReference<Spillable>>() {
+
+ /**
+ * We don't lock anything, so this sort may not be stable if a WeakReference suddenly
+ * becomes null, but it will be close enough.
+ * Also between the time we sort and we use these spillables, they
+ * may actually change in size - so this is just best effort
+ */
@Override
- public int compare(SpillablePtr o1Ref, SpillablePtr o2Ref) {
+ public int compare(WeakReference<Spillable> o1Ref, WeakReference<Spillable> o2Ref) {
Spillable o1 = o1Ref.get();
Spillable o2 = o2Ref.get();
if (o1 == null && o2 == null) {
@@ -237,64 +231,77 @@ public class SpillableMemoryManager impl
});
long estimatedFreed = 0;
int numObjSpilled = 0;
- /*
- * Before PIG-3979, Pig invoke System.gc inside this hook,
- * but calling gc from within a gc notification seems to cause a lot of gc activity.
- * More accurately, on Java implementations in which System.gc() does force a gc,
- * this causes a lot of looping. On systems in which System.gc() is just a suggestion,
- * relying on the outcome of the System.gc() call is a mistake.
- *
- * Therefore, this version of the code does away with attempts to guess what happens
- * what we call s.spill(). For POPartionAgg spillables, the call tells the spillable
- * to reduce itself. No data is necessarily written to disk.
- */
- for (Iterator<SpillablePtr> i = spillablesSR.iterator(); i.hasNext();) {
- SpillablePtr sPtr = i.next();
- long toBeFreed = sPtr.getMemorySize();
+ boolean invokeGC = false;
+ boolean extraGCCalled = false;
+ for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) {
+ WeakReference<Spillable> weakRef = i.next();
+ Spillable s = weakRef.get();
+ // Still need to check for null here, even after we removed
+ // above, because the reference may have gone bad on us
+ // since the last check.
+ if (s == null) {
+ i.remove();
+ continue;
+ }
+ long toBeFreed = s.getMemorySize();
log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold = "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize);
// Don't keep trying if the rest of files are too small
if (toBeFreed < spillFileSizeThreshold) {
log.debug("spilling small files - getting out of memory handler");
break ;
}
- Spillable s = sPtr.get();
- if (s != null)
- s.spill();
+ // If single Spillable is bigger than the threshold,
+ // we force GC to make sure we really need to keep this
+ // object before paying for the expensive spill().
+ // Done at most once per handleNotification.
+ if( !extraGCCalled && extraGCSpillSizeThreshold != 0
+ && toBeFreed > extraGCSpillSizeThreshold ) {
+ log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()");
+ // this extra assignment to null is needed so that gc can free the
+ // spillable if nothing else is pointing at it
+ s = null;
+ System.gc();
+ extraGCCalled = true;
+ // checking again to see if this reference is still valid
+ s = weakRef.get();
+ if (s == null) {
+ i.remove();
+ accumulatedFreeSize = 0;
+ invokeGC = false;
+ continue;
+ }
+ }
+ s.spill();
numObjSpilled++;
estimatedFreed += toBeFreed;
-
+ accumulatedFreeSize += toBeFreed;
+ // This should significantly reduce the number of small files
+ // in case that we have a lot of nested bags
+ if (accumulatedFreeSize > gcActivationSize) {
+ invokeGC = true;
+ }
+
if (estimatedFreed > toFree) {
log.debug("Freed enough space - getting out of memory handler");
+ invokeGC = true;
break;
}
+ }
+ /* Poke the GC again to see if we successfully freed enough memory */
+ if(invokeGC) {
+ System.gc();
+ // now that we have invoked the GC, reset accumulatedFreeSize
+ accumulatedFreeSize = 0;
}
- // We are done with the strongly referenced list of spillables
- spillablesSR = null;
-
- if (estimatedFreed > 0) {
+ if(estimatedFreed > 0){
String msg = "Spilled an estimate of " + estimatedFreed +
" bytes from " + numObjSpilled + " objects. " + info.getUsage();;
- log.debug(msg);
+ log.info(msg);
}
}
}
- public static class SpillablePtr {
- private WeakReference<Spillable> spillable;
- private long size;
- SpillablePtr(Spillable p, long s) {
- spillable = new WeakReference<Spillable>(p);
- size = s;
- }
- public Spillable get() {
- return spillable.get();
- }
- public long getMemorySize() {
- return size;
- }
- }
-
public void clearSpillables() {
synchronized (spillables) {
// Walk the list first and remove nulls, otherwise the sort
@@ -315,7 +322,7 @@ public class SpillableMemoryManager impl
*/
public void registerSpillable(Spillable s) {
synchronized(spillables) {
- // Cleaning the entire list is too expensive. Just trim off the front while
+ // Cleaing the entire list is too expensive. Just trim off the front while
// we can.
WeakReference<Spillable> first = spillables.peek();
while (first != null && first.get() == null) {