You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/10/13 02:19:45 UTC
svn commit: r1631269 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
src/org/apache/pig/impl/util/SpillableMemoryManager.java
Author: daijy
Date: Mon Oct 13 00:19:44 2014
New Revision: 1631269
URL: http://svn.apache.org/r1631269
Log:
PIG-3979: group all performance, garbage collection, and incremental aggregation
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1631269&r1=1631268&r2=1631269&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Oct 13 00:19:44 2014
@@ -94,6 +94,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-3979: group all performance, garbage collection, and incremental aggregation (ddreyfus via daijy)
+
PIG-4199: Mapreduce ACLs should be translated to Tez ACLs (rohini)
PIG-4227: Streaming Python UDF handles bag outputs incorrectly (cheolsoo)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1631269&r1=1631268&r2=1631269&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Mon Oct 13 00:19:44 2014
@@ -63,10 +63,6 @@ public class POPartialAgg extends Physic
private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP,
null);
- // number of records to sample to determine average size used by each
- // entry in hash map and average seen reduction
- private static final int NUM_RECS_TO_SAMPLE = 10000;
-
// We want to avoid massive ArrayList copies as they get big.
// Array Lists grow by prevSize + prevSize/2. Given default initial size of 10,
// 9369 is the size of the array after 18 such resizings. This seems like a sufficiently
@@ -100,7 +96,15 @@ public class POPartialAgg extends Physic
private boolean disableMapAgg = false;
private boolean sizeReductionChecked = false;
private boolean inputsExhausted = false;
+ // The doSpill flag is set when spilling is running or needs to run.
+ // It is set by POPartialAgg when its buffers are full after having run aggregations.
+ // The doContingentSpill flag is set when the SpillableMemoryManager is notified
+ // by GC that the runtime is low on memory and the SpillableMemoryManager identifies
+ // the particular buffer as a good spill candidate because it is large. The contingent spill logic tries
+ // to satisfy the memory manager's request for freeing memory by aggregating data
+ // rather than just spilling records to disk.
private volatile boolean doSpill = false;
+ private volatile boolean doContingentSpill = false;
private transient MemoryLimits memLimits;
private transient boolean initialized = false;
@@ -110,6 +114,8 @@ public class POPartialAgg extends Physic
private int avgTupleSize = 0;
private Iterator<Entry<Object, List<Tuple>>> spillingIterator;
private boolean estimatedMemThresholds = false;
+ private long sampleMem;
+ private long sampleSize = 0;
public POPartialAgg(OperatorKey k) {
@@ -124,6 +130,9 @@ public class POPartialAgg extends Physic
disableMapAgg();
}
initialized = true;
+ sampleMem = (long) (Runtime.getRuntime().maxMemory() * percent);
+ sampleSize = 0;
+
SpillableMemoryManager.getInstance().registerSpillable(this);
}
@@ -145,12 +154,39 @@ public class POPartialAgg extends Physic
}
while (true) {
- if (!sizeReductionChecked && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
+ if (!sizeReductionChecked && sampleSize >= sampleMem /*
+ * numRecsInRawMap
+ * >=
+ * NUM_RECS_TO_SAMPLE
+ */) {
checkSizeReduction();
+ sampleSize = 0;
}
- if (!estimatedMemThresholds && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
+ if (!estimatedMemThresholds && sampleSize >= sampleMem /*
+ * numRecsInRawMap
+ * >=
+ * NUM_RECS_TO_SAMPLE
+ */) {
estimateMemThresholds();
}
+ if (doContingentSpill) {
+ doContingentSpill = false;
+ // Don't aggregate if spilling. Avoid concurrent update of spilling iterator.
+ if (doSpill == false) {
+ // SpillableMemoryManager requested a spill to reduce memory
+ // consumption.
+ // See if we can avoid it.
+ aggregateFirstLevel();
+ aggregateSecondLevel();
+ if (!shouldSpill()) {
+ LOG.debug("Spill triggered by SpillableMemoryManager suppressed");
+ } else {
+ LOG.debug("Spill triggered by SpillableMemoryManager");
+ doSpill = true;
+ }
+ }
+
+ }
if (doSpill) {
startSpill();
Result result = spillResult();
@@ -175,7 +211,7 @@ public class POPartialAgg extends Physic
// parent input is over. flush what we have.
inputsExhausted = true;
startSpill();
- LOG.info("Spilling last bits.");
+ LOG.debug("Spilling last bits.");
continue;
} else {
return EOP_RESULT;
@@ -194,6 +230,19 @@ public class POPartialAgg extends Physic
}
Object key = keyRes.result;
keyPlan.detachInput();
+ if (numRecsInRawMap == 0) {
+ sampleSize = 0;
+ }
+ // Collecting the tuple memory size is surprisingly
+ // expensive.
+ // don't do it after we have the memory threshold since it's
+ // no longer used.
+ if (!estimatedMemThresholds) {
+ int tupleSize = (int) inpTuple.getMemorySize();
+ sampleSize += tupleSize;
+ if (avgTupleSize == 0)
+ avgTupleSize = tupleSize;
+ }
numRecsInRawMap += 1;
addKeyValToMap(rawInputMap, key, inpTuple);
@@ -234,6 +283,13 @@ public class POPartialAgg extends Physic
firstTierThreshold = (int) (0.5 + totalTuples * (1f - (1f / sizeReduction)));
secondTierThreshold = (int) (0.5 + totalTuples * (1f / sizeReduction));
LOG.info("Setting thresholds. Primary: " + firstTierThreshold + ". Secondary: " + secondTierThreshold);
+ // The second tier should at least allow one tuple before it tries to aggregate.
+ // This code retains the total number of tuples in the buffer while guaranteeing
+ // the second tier has at least one tuple.
+ if (secondTierThreshold == 0) {
+ secondTierThreshold += 1;
+ firstTierThreshold -= 1;
+ }
}
estimatedMemThresholds = true;
}
@@ -243,9 +299,9 @@ public class POPartialAgg extends Physic
aggregateFirstLevel();
aggregateSecondLevel();
int numAfterReduction = numRecsInProcessedMap + numRecsInRawMap;
- LOG.info("After reduction, processed map: " + numRecsInProcessedMap + "; raw map: " + numRecsInRawMap);
+ LOG.debug("After reduction, processed map: " + numRecsInProcessedMap + "; raw map: " + numRecsInRawMap);
int minReduction = getMinOutputReductionFromProp();
- LOG.info("Observed reduction factor: from " + numBeforeReduction +
+ LOG.debug("Observed reduction factor: from " + numBeforeReduction +
" to " + numAfterReduction +
" => " + numBeforeReduction / numAfterReduction + ".");
if ( numBeforeReduction / numAfterReduction < minReduction) {
@@ -266,15 +322,15 @@ public class POPartialAgg extends Physic
}
private boolean shouldAggregateFirstLevel() {
- if (LOG.isInfoEnabled() && numRecsInRawMap > firstTierThreshold) {
- LOG.info("Aggregating " + numRecsInRawMap + " raw records.");
+ if (LOG.isDebugEnabled() && numRecsInRawMap > firstTierThreshold) {
+ LOG.debug("Aggregating " + numRecsInRawMap + " raw records.");
}
return (numRecsInRawMap > firstTierThreshold);
}
private boolean shouldAggregateSecondLevel() {
- if (LOG.isInfoEnabled() && numRecsInProcessedMap > secondTierThreshold) {
- LOG.info("Aggregating " + numRecsInProcessedMap + " secondary records.");
+ if (LOG.isDebugEnabled() && numRecsInProcessedMap > secondTierThreshold) {
+ LOG.debug("Aggregating " + numRecsInProcessedMap + " secondary records.");
}
return (numRecsInProcessedMap > secondTierThreshold);
}
@@ -310,21 +366,21 @@ public class POPartialAgg extends Physic
if (spillingIterator != null) return;
if (!rawInputMap.isEmpty()) {
- if (LOG.isInfoEnabled()) {
- LOG.info("In startSpill(), aggregating raw inputs. " + numRecsInRawMap + " tuples.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("In startSpill(), aggregating raw inputs. " + numRecsInRawMap + " tuples.");
}
aggregateFirstLevel();
- if (LOG.isInfoEnabled()) {
- LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("processed inputs: " + numRecsInProcessedMap + " tuples.");
}
}
if (!processedInputMap.isEmpty()) {
- if (LOG.isInfoEnabled()) {
- LOG.info("In startSpill(), aggregating processed inputs. " + numRecsInProcessedMap + " tuples.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("In startSpill(), aggregating processed inputs. " + numRecsInProcessedMap + " tuples.");
}
aggregateSecondLevel();
- if (LOG.isInfoEnabled()) {
- LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("processed inputs: " + numRecsInProcessedMap + " tuples.");
}
}
doSpill = true;
@@ -335,7 +391,7 @@ public class POPartialAgg extends Physic
// if no more to spill, return EOP_RESULT.
if (processedInputMap.isEmpty()) {
spillingIterator = null;
- LOG.info("In spillResults(), processed map is empty -- done spilling.");
+ LOG.debug("In spillResults(), processed map is empty -- done spilling.");
return EOP_RESULT;
} else {
Map.Entry<Object, List<Tuple>> entry = spillingIterator.next();
@@ -536,8 +592,7 @@ public class POPartialAgg extends Physic
@Override
public long spill() {
- LOG.info("Spill triggered by SpillableMemoryManager");
- doSpill = true;
+ doContingentSpill = true;
return 0;
}
Modified: pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1631269&r1=1631268&r2=1631269&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java Mon Oct 13 00:19:44 2014
@@ -50,7 +50,9 @@ public class SpillableMemoryManager impl
private final Log log = LogFactory.getLog(getClass());
- LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
+ private LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
+ // String references to spillables
+ private LinkedList<SpillablePtr> spillablesSR = null;
// if we freed at least this much, invoke GC
// (default 40 MB - this can be overridden by user supplied property)
@@ -60,10 +62,6 @@ public class SpillableMemoryManager impl
// (default 5MB - this can be overridden by user supplied property)
private static long spillFileSizeThreshold = 5000000L ;
- // this will keep track of memory freed across spills
- // and between GC invocations
- private static long accumulatedFreeSize = 0L;
-
// fraction of biggest heap for which we want to get
// "memory usage threshold exceeded" notifications
private static double memoryThresholdFraction = 0.7;
@@ -77,11 +75,6 @@ public class SpillableMemoryManager impl
// log notification on collection threshold exceeded only the first time
private boolean firstCollectionThreshExceededLogged = false;
-
- // fraction of the total heap used for the threshold to determine
- // if we want to perform an extra gc before the spill
- private static double extraGCThresholdFraction = 0.05;
- private static long extraGCSpillSizeThreshold = 0L;
private static volatile SpillableMemoryManager manager;
@@ -90,7 +83,6 @@ public class SpillableMemoryManager impl
List<MemoryPoolMXBean> mpbeans = ManagementFactory.getMemoryPoolMXBeans();
MemoryPoolMXBean biggestHeap = null;
long biggestSize = 0;
- long totalSize = 0;
for (MemoryPoolMXBean b: mpbeans) {
log.debug("Found heap (" + b.getName() +
") of type " + b.getType());
@@ -99,14 +91,12 @@ public class SpillableMemoryManager impl
* heap is the tenured heap
*/
long size = b.getUsage().getMax();
- totalSize += size;
if (size > biggestSize) {
biggestSize = size;
biggestHeap = b;
}
}
}
- extraGCSpillSizeThreshold = (long) (totalSize * extraGCThresholdFraction);
if (biggestHeap == null) {
throw new RuntimeException("Couldn't find heap");
}
@@ -189,6 +179,7 @@ public class SpillableMemoryManager impl
}
}
+ // Remove empty spillables to improve sort speed.
clearSpillables();
if (toFree < 0) {
log.debug("low memory handler returning " +
@@ -196,16 +187,31 @@ public class SpillableMemoryManager impl
return;
}
synchronized(spillables) {
- Collections.sort(spillables, new Comparator<WeakReference<Spillable>>() {
-
- /**
- * We don't lock anything, so this sort may not be stable if a WeakReference suddenly
- * becomes null, but it will be close enough.
- * Also between the time we sort and we use these spillables, they
- * may actually change in size - so this is just best effort
- */
+ /**
+ * Store a reference to a spillable and its size into a stable
+ * list so that the sort is stable (a Java 7 contract).
+ * Between the time we sort and we use these spillables, they
+ * may actually change in size.
+ */
+ spillablesSR = new LinkedList<SpillablePtr>();
+ for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) {
+ // Check that the object still exists before adding to the Strong Referenced list.
+ Spillable s = i.next().get();
+ if (s == null) {
+ i.remove();
+ continue;
+ }
+ // Get a ptr to the spillable and its current size.
+ // we need a stable size for sorting.
+ spillablesSR.add(new SpillablePtr(s, s.getMemorySize()));
+ }
+ log.debug("Spillables list size: " + spillablesSR.size());
+ Collections.sort(spillablesSR, new Comparator<SpillablePtr>() {
+ // Sort the list in descending order. We spill the biggest items first,
+ // and only as many as we need to to reduce memory usage
+ // below the threshold.
@Override
- public int compare(WeakReference<Spillable> o1Ref, WeakReference<Spillable> o2Ref) {
+ public int compare(SpillablePtr o1Ref, SpillablePtr o2Ref) {
Spillable o1 = o1Ref.get();
Spillable o2 = o2Ref.get();
if (o1 == null && o2 == null) {
@@ -231,77 +237,64 @@ public class SpillableMemoryManager impl
});
long estimatedFreed = 0;
int numObjSpilled = 0;
- boolean invokeGC = false;
- boolean extraGCCalled = false;
- for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) {
- WeakReference<Spillable> weakRef = i.next();
- Spillable s = weakRef.get();
- // Still need to check for null here, even after we removed
- // above, because the reference may have gone bad on us
- // since the last check.
- if (s == null) {
- i.remove();
- continue;
- }
- long toBeFreed = s.getMemorySize();
+ /*
+ * Before PIG-3979, Pig invoke System.gc inside this hook,
+ * but calling gc from within a gc notification seems to cause a lot of gc activity.
+ * More accurately, on Java implementations in which System.gc() does force a gc,
+ * this causes a lot of looping. On systems in which System.gc() is just a suggestion,
+ * relying on the outcome of the System.gc() call is a mistake.
+ *
+ * Therefore, this version of the code does away with attempts to guess what happens
+ * what we call s.spill(). For POPartionAgg spillables, the call tells the spillable
+ * to reduce itself. No data is necessarily written to disk.
+ */
+ for (Iterator<SpillablePtr> i = spillablesSR.iterator(); i.hasNext();) {
+ SpillablePtr sPtr = i.next();
+ long toBeFreed = sPtr.getMemorySize();
log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold = "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize);
// Don't keep trying if the rest of files are too small
if (toBeFreed < spillFileSizeThreshold) {
log.debug("spilling small files - getting out of memory handler");
break ;
}
- // If single Spillable is bigger than the threshold,
- // we force GC to make sure we really need to keep this
- // object before paying for the expensive spill().
- // Done at most once per handleNotification.
- if( !extraGCCalled && extraGCSpillSizeThreshold != 0
- && toBeFreed > extraGCSpillSizeThreshold ) {
- log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()");
- // this extra assignment to null is needed so that gc can free the
- // spillable if nothing else is pointing at it
- s = null;
- System.gc();
- extraGCCalled = true;
- // checking again to see if this reference is still valid
- s = weakRef.get();
- if (s == null) {
- i.remove();
- accumulatedFreeSize = 0;
- invokeGC = false;
- continue;
- }
- }
- s.spill();
+ Spillable s = sPtr.get();
+ if (s != null)
+ s.spill();
numObjSpilled++;
estimatedFreed += toBeFreed;
- accumulatedFreeSize += toBeFreed;
- // This should significantly reduce the number of small files
- // in case that we have a lot of nested bags
- if (accumulatedFreeSize > gcActivationSize) {
- invokeGC = true;
- }
-
+
if (estimatedFreed > toFree) {
log.debug("Freed enough space - getting out of memory handler");
- invokeGC = true;
break;
}
- }
- /* Poke the GC again to see if we successfully freed enough memory */
- if(invokeGC) {
- System.gc();
- // now that we have invoked the GC, reset accumulatedFreeSize
- accumulatedFreeSize = 0;
}
- if(estimatedFreed > 0){
+ // We are done with the strongly referenced list of spillables
+ spillablesSR = null;
+
+ if (estimatedFreed > 0) {
String msg = "Spilled an estimate of " + estimatedFreed +
" bytes from " + numObjSpilled + " objects. " + info.getUsage();;
- log.info(msg);
+ log.debug(msg);
}
}
}
+ public static class SpillablePtr {
+ private WeakReference<Spillable> spillable;
+ private long size;
+ SpillablePtr(Spillable p, long s) {
+ spillable = new WeakReference<Spillable>(p);
+ size = s;
+ }
+ public Spillable get() {
+ return spillable.get();
+ }
+ public long getMemorySize() {
+ return size;
+ }
+ }
+
public void clearSpillables() {
synchronized (spillables) {
// Walk the list first and remove nulls, otherwise the sort
@@ -322,7 +315,7 @@ public class SpillableMemoryManager impl
*/
public void registerSpillable(Spillable s) {
synchronized(spillables) {
- // Cleaing the entire list is too expensive. Just trim off the front while
+ // Cleaning the entire list is too expensive. Just trim off the front while
// we can.
WeakReference<Spillable> first = spillables.peek();
while (first != null && first.get() == null) {