You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2012/12/07 00:46:24 UTC
svn commit: r1418135 - in /pig/branches/branch-0.11: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
src/org/apache/pig/data/DefaultAbstractBag.java
Author: dvryaboy
Date: Thu Dec 6 23:46:24 2012
New Revision: 1418135
URL: http://svn.apache.org/viewvc?rev=1418135&view=rev
Log:
PIG-3044: Trigger POPartialAgg compaction under GC pressure
Modified:
pig/branches/branch-0.11/CHANGES.txt
pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
pig/branches/branch-0.11/src/org/apache/pig/data/DefaultAbstractBag.java
Modified: pig/branches/branch-0.11/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/CHANGES.txt?rev=1418135&r1=1418134&r2=1418135&view=diff
==============================================================================
--- pig/branches/branch-0.11/CHANGES.txt (original)
+++ pig/branches/branch-0.11/CHANGES.txt Thu Dec 6 23:46:24 2012
@@ -30,6 +30,8 @@ PIG-1891 Enable StoreFunc to make intell
IMPROVEMENTS
+PIG-3044: Trigger POPartialAgg compaction under GC pressure (dvryaboy)
+
PIG-2907: Publish pig jars for Hadoop2/23 to maven (rohini)
PIG-2934: HBaseStorage filter optimizations (billgraham)
Modified: pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1418135&r1=1418134&r2=1418135&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Thu Dec 6 23:46:24 2012
@@ -44,6 +44,9 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Spillable;
+import org.apache.pig.impl.util.SpillableMemoryManager;
+
import com.google.common.collect.Maps;
/**
@@ -53,7 +56,7 @@ import com.google.common.collect.Maps;
* map. Once that map fills up or all input has been seen, results are
* piped out into the next operator (caller of getNext()).
*/
-public class POPartialAgg extends PhysicalOperator {
+public class POPartialAgg extends PhysicalOperator implements Spillable {
private static final Log LOG = LogFactory.getLog(POPartialAgg.class);
private static final long serialVersionUID = 1L;
@@ -104,9 +107,10 @@ public class POPartialAgg extends Physic
private transient boolean initialized = false;
private int firstTierThreshold = FIRST_TIER_THRESHOLD;
private int secondTierThreshold = SECOND_TIER_THRESHOLD;
- private int sizeReduction;
+ private int sizeReduction = 1;
+ private int avgTupleSize = 0;
private Iterator<Entry<Object, List<Tuple>>> spillingIterator;
- private boolean estimatedMemThresholds;
+ private boolean estimatedMemThresholds = false;
public POPartialAgg(OperatorKey k) {
@@ -121,6 +125,7 @@ public class POPartialAgg extends Physic
disableMapAgg();
}
initialized = true;
+ SpillableMemoryManager.getInstance().registerSpillable(this);
}
@Override
@@ -222,6 +227,7 @@ public class POPartialAgg extends Physic
memLimits.addNewObjSize(mem);
}
}
+ avgTupleSize = estTotalMem / estTuples;
int totalTuples = memLimits.getCacheLimit();
LOG.info("Estimated total tuples to buffer, based on " + estTuples + " tuples that took up " + estTotalMem + " bytes: " + totalTuples);
firstTierThreshold = (int) (0.5 + totalTuples * (1f - (1f / sizeReduction)));
@@ -287,8 +293,8 @@ public class POPartialAgg extends Physic
value.add(inpTuple);
if (value.size() >= MAX_LIST_SIZE) {
boolean isFirst = (map == rawInputMap);
- if (LOG.isInfoEnabled()){
- LOG.info("The cache for key " + key + " has grown too large. Aggregating " + ((isFirst) ? "first level." : "second level."));
+ if (LOG.isDebugEnabled()){
+ LOG.debug("The cache for key " + key + " has grown too large. Aggregating " + ((isFirst) ? "first level." : "second level."));
}
if (isFirst) {
aggregateRawRow(key);
@@ -526,4 +532,20 @@ public class POPartialAgg extends Physic
}
}
+ @Override
+ public long spill() {
+ try {
+ LOG.info("Spill triggered by SpillableMemoryManager");
+ startSpill();
+ } catch (ExecException e) {
+ throw new RuntimeException(e);
+ }
+ return 0;
+ }
+
+ @Override
+ public long getMemorySize() {
+ return avgTupleSize * (numRecsInProcessedMap + numRecsInRawMap);
+ }
+
}
Modified: pig/branches/branch-0.11/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1418135&r1=1418134&r2=1418135&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/data/DefaultAbstractBag.java Thu Dec 6 23:46:24 2012
@@ -67,7 +67,7 @@ public abstract class DefaultAbstractBag
protected int mLastContentsSize = -1;
- protected long mMemSize = 0;
+ protected long avgTupleSize = 0;
private boolean spillableRegistered = false;
@@ -130,33 +130,40 @@ public abstract class DefaultAbstractBag
*/
@Override
public long getMemorySize() {
- int j;
+ int j = 0;
int numInMem = 0;
- long used = 0;
synchronized (mContents) {
- if (mLastContentsSize == mContents.size()) return mMemSize;
+ numInMem = mContents.size();
+
+
+ // If we've already gotten the estimate
+ // and the number of tuples hasn't changed, or was above 100 and
+ // is still above 100, we can
+ // produce a new estimate without sampling the tuples again.
+ if (avgTupleSize != 0 && (mLastContentsSize == numInMem ||
+ mLastContentsSize > 100 && numInMem > 100))
+ return totalSizeFromAvgTupleSize(avgTupleSize, numInMem);
- // I can't afford to talk through all the tuples every time the
+ // Measure only what's in memory, not what's on disk.
+ // I can't afford to walk through all the tuples every time the
// memory manager wants to know if it's time to dump. Just sample
// the first 100 and see what we get. This may not be 100%
// accurate, but it's just an estimate anyway.
- numInMem = mContents.size();
- // Measure only what's in memory, not what's on disk.
Iterator<Tuple> i = mContents.iterator();
for (j = 0; i.hasNext() && j < 100; j++) {
- used += i.next().getMemorySize();
+ avgTupleSize += i.next().getMemorySize();
}
- mLastContentsSize = numInMem;
}
- if (numInMem > 100) {
- // Estimate the per tuple size. Do it in integer arithmetic
- // (even though it will be slightly less accurate) for speed.
- used /= j;
- used *= numInMem;
+ mLastContentsSize = numInMem;
+ avgTupleSize /= j;
+ return totalSizeFromAvgTupleSize(avgTupleSize, numInMem);
}
+ private long totalSizeFromAvgTupleSize(long avgTupleSize, int numInMem) {
+ long used = avgTupleSize * numInMem;
+
// add up the overhead for this object and other object variables
int bag_fix_size = 8 /* object header */
+ 4 + 8 + 8 /* mLastContentsSize + mMemSize + mSize */
@@ -185,11 +192,9 @@ public abstract class DefaultAbstractBag
used += mSpillFiles.size() * approx_per_entry_size;
}
}
-
- mMemSize = used;
return used;
- }
+ }
/**
* Memory size of objects are rounded to multiple of 8 bytes