You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2013/03/13 01:52:36 UTC
svn commit: r1455767 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
Author: dvryaboy
Date: Wed Mar 13 00:52:35 2013
New Revision: 1455767
URL: http://svn.apache.org/r1455767
Log:
PIG-3241: ConcurrentModificationException in POPartialAgg
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1455767&r1=1455766&r2=1455767&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Mar 13 00:52:35 2013
@@ -231,6 +231,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-3241: ConcurrentModificationException in POPartialAgg (dvryaboy)
+
PIG-3144: Erroneous map entry alias resolution leading to "Duplicate schema alias" errors (jcoveney via cheolsoo)
PIGG-3212: Race Conditions in POSort and (Internal)SortedBag during Proactive Spill (kadeng via dvryaboy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1455767&r1=1455766&r2=1455767&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Wed Mar 13 00:52:35 2013
@@ -101,7 +101,7 @@ public class POPartialAgg extends Physic
private boolean disableMapAgg = false;
private boolean sizeReductionChecked = false;
private boolean inputsExhausted = false;
- private boolean doSpill = false;
+ private volatile boolean doSpill = false;
private transient MemoryLimits memLimits;
private transient boolean initialized = false;
@@ -123,7 +123,7 @@ public class POPartialAgg extends Physic
if (percent <= 0) {
LOG.info("No memory allocated to intermediate memory buffers. Turning off partial aggregation.");
disableMapAgg();
- }
+ }
initialized = true;
SpillableMemoryManager.getInstance().registerSpillable(this);
}
@@ -143,7 +143,7 @@ public class POPartialAgg extends Physic
if (!initialized && !ALL_POPARTS.containsKey(this)) {
init();
- }
+ }
while (true) {
if (!sizeReductionChecked && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
@@ -151,48 +151,49 @@ public class POPartialAgg extends Physic
}
if (!estimatedMemThresholds && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
estimateMemThresholds();
- }
+ }
if (doSpill) {
+ startSpill();
Result result = spillResult();
if (result == EOP_RESULT) {
doSpill = false;
- }
+ }
if (result != EOP_RESULT || inputsExhausted) {
return result;
- }
- }
+ }
+ }
if (mapAggDisabled()) {
// disableMapAgg() sets doSpill, so we can't get here while there is still contents in the buffered maps.
// if we get to this point, everything is flushed, so we can simply return the raw tuples from now on.
return processInput();
} else {
- Result inp = processInput();
- if (inp.returnStatus == POStatus.STATUS_ERR) {
- return inp;
+ Result inp = processInput();
+ if (inp.returnStatus == POStatus.STATUS_ERR) {
+ return inp;
} else if (inp.returnStatus == POStatus.STATUS_EOP) {
- if (parentPlan.endOfAllInput) {
+ if (parentPlan.endOfAllInput) {
// parent input is over. flush what we have.
inputsExhausted = true;
startSpill();
LOG.info("Spilling last bits.");
continue;
- } else {
+ } else {
return EOP_RESULT;
- }
+ }
} else if (inp.returnStatus == POStatus.STATUS_NULL) {
- continue;
+ continue;
} else {
// add this input to map.
- Tuple inpTuple = (Tuple) inp.result;
- keyPlan.attachInput(inpTuple);
+ Tuple inpTuple = (Tuple) inp.result;
+ keyPlan.attachInput(inpTuple);
- // evaluate the key
- Result keyRes = getResult(keyLeaf);
- if (keyRes == ERR_RESULT) {
- return ERR_RESULT;
- }
- Object key = keyRes.result;
- keyPlan.detachInput();
+ // evaluate the key
+ Result keyRes = getResult(keyLeaf);
+ if (keyRes == ERR_RESULT) {
+ return ERR_RESULT;
+ }
+ Object key = keyRes.result;
+ keyPlan.detachInput();
numRecsInRawMap += 1;
addKeyValToMap(rawInputMap, key, inpTuple);
@@ -225,7 +226,7 @@ public class POPartialAgg extends Physic
int mem = (int) t.getMemorySize();
estTotalMem += mem;
memLimits.addNewObjSize(mem);
- }
+ }
}
avgTupleSize = estTotalMem / estTuples;
int totalTuples = memLimits.getCacheLimit();
@@ -235,7 +236,7 @@ public class POPartialAgg extends Physic
LOG.info("Setting thresholds. Primary: " + firstTierThreshold + ". Secondary: " + secondTierThreshold);
}
estimatedMemThresholds = true;
- }
+ }
private void checkSizeReduction() throws ExecException {
int numBeforeReduction = numRecsInProcessedMap + numRecsInRawMap;
@@ -262,14 +263,14 @@ public class POPartialAgg extends Physic
private boolean mapAggDisabled() {
return disableMapAgg;
- }
+ }
private boolean shouldAggregateFirstLevel() {
if (LOG.isInfoEnabled() && numRecsInRawMap > firstTierThreshold) {
LOG.info("Aggregating " + numRecsInRawMap + " raw records.");
}
return (numRecsInRawMap > firstTierThreshold);
- }
+ }
private boolean shouldAggregateSecondLevel() {
if (LOG.isInfoEnabled() && numRecsInProcessedMap > secondTierThreshold) {
@@ -290,21 +291,24 @@ public class POPartialAgg extends Physic
value = new ArrayList<Tuple>();
map.put(key, value);
}
- value.add(inpTuple);
- if (value.size() >= MAX_LIST_SIZE) {
- boolean isFirst = (map == rawInputMap);
+ value.add(inpTuple);
+ if (value.size() >= MAX_LIST_SIZE) {
+ boolean isFirst = (map == rawInputMap);
if (LOG.isDebugEnabled()){
LOG.debug("The cache for key " + key + " has grown too large. Aggregating " + ((isFirst) ? "first level." : "second level."));
- }
- if (isFirst) {
- aggregateRawRow(key);
- } else {
- aggregateSecondLevel();
- }
- }
- }
+ }
+ if (isFirst) {
+ aggregateRawRow(key);
+ } else {
+ aggregateSecondLevel();
+ }
+ }
+ }
private void startSpill() throws ExecException {
+ // If spillingIterator is null, we are already spilling and don't need to set up.
+ if (spillingIterator != null) return;
+
if (!rawInputMap.isEmpty()) {
if (LOG.isInfoEnabled()) {
LOG.info("In startSpill(), aggregating raw inputs. " + numRecsInRawMap + " tuples.");
@@ -322,14 +326,15 @@ public class POPartialAgg extends Physic
if (LOG.isInfoEnabled()) {
LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples.");
}
- }
+ }
doSpill = true;
spillingIterator = processedInputMap.entrySet().iterator();
- }
+ }
private Result spillResult() throws ExecException {
// if no more to spill, return EOP_RESULT.
if (processedInputMap.isEmpty()) {
+ spillingIterator = null;
LOG.info("In spillResults(), processed map is empty -- done spilling.");
return EOP_RESULT;
} else {
@@ -365,9 +370,9 @@ public class POPartialAgg extends Physic
iter.remove();
addKeyValToMap(toMap, entry.getKey(), getAggResultTuple(res.result));
numEntriesInTarget += valueTuple.size() - 1;
- }
- return numEntriesInTarget;
}
+ return numEntriesInTarget;
+ }
private void aggregateFirstLevel() throws ExecException {
numRecsInProcessedMap = aggregate(rawInputMap, processedInputMap, numRecsInProcessedMap);
@@ -436,7 +441,7 @@ public class POPartialAgg extends Physic
PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
if (usage != null) {
percent = Float.parseFloat(usage);
- }
+ }
}
return percent;
}
@@ -536,12 +541,8 @@ public class POPartialAgg extends Physic
@Override
public long spill() {
- try {
- LOG.info("Spill triggered by SpillableMemoryManager");
- startSpill();
- } catch (ExecException e) {
- throw new RuntimeException(e);
- }
+ LOG.info("Spill triggered by SpillableMemoryManager");
+ doSpill = true;
return 0;
}