You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2013/03/13 01:52:36 UTC

svn commit: r1455767 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java

Author: dvryaboy
Date: Wed Mar 13 00:52:35 2013
New Revision: 1455767

URL: http://svn.apache.org/r1455767
Log:
PIG-3241: ConcurrentModificationException in POPartialAgg 

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1455767&r1=1455766&r2=1455767&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Mar 13 00:52:35 2013
@@ -231,6 +231,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-3241: ConcurrentModificationException in POPartialAgg (dvryaboy)
+
 PIG-3144: Erroneous map entry alias resolution leading to "Duplicate schema alias" errors (jcoveney via cheolsoo)
 
 PIGG-3212: Race Conditions in POSort and (Internal)SortedBag during Proactive Spill (kadeng via dvryaboy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1455767&r1=1455766&r2=1455767&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Wed Mar 13 00:52:35 2013
@@ -101,7 +101,7 @@ public class POPartialAgg extends Physic
     private boolean disableMapAgg = false;
     private boolean sizeReductionChecked = false;
     private boolean inputsExhausted = false;
-    private boolean doSpill = false;
+    private volatile boolean doSpill = false;
     private transient MemoryLimits memLimits;
 
     private transient boolean initialized = false;
@@ -123,7 +123,7 @@ public class POPartialAgg extends Physic
         if (percent <= 0) {
             LOG.info("No memory allocated to intermediate memory buffers. Turning off partial aggregation.");
             disableMapAgg();
-    }
+        }
         initialized = true;
         SpillableMemoryManager.getInstance().registerSpillable(this);
     }
@@ -143,7 +143,7 @@ public class POPartialAgg extends Physic
 
         if (!initialized && !ALL_POPARTS.containsKey(this)) {
             init();
-                }
+        }
 
         while (true) {
             if (!sizeReductionChecked && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
@@ -151,48 +151,49 @@ public class POPartialAgg extends Physic
             }
             if (!estimatedMemThresholds && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
                 estimateMemThresholds();
-        }
+            }
             if (doSpill) {
+                startSpill();
                 Result result = spillResult();
                 if (result == EOP_RESULT) {
                     doSpill = false;
-            }
+                }
                 if (result != EOP_RESULT || inputsExhausted) {
                     return result;
-        }
-        }
+                }
+            }
             if (mapAggDisabled()) {
                 // disableMapAgg() sets doSpill, so we can't get here while there is still contents in the buffered maps.
                 // if we get to this point, everything is flushed, so we can simply return the raw tuples from now on.
                 return processInput();
             } else {
-            Result inp = processInput();
-            if (inp.returnStatus == POStatus.STATUS_ERR) {
-                return inp;
+                Result inp = processInput();
+                if (inp.returnStatus == POStatus.STATUS_ERR) {
+                    return inp;
                 } else if (inp.returnStatus == POStatus.STATUS_EOP) {
-                if (parentPlan.endOfAllInput) {
+                    if (parentPlan.endOfAllInput) {
                         // parent input is over. flush what we have.
                         inputsExhausted = true;
                         startSpill();
                         LOG.info("Spilling last bits.");
                         continue;
-                } else {
+                    } else {
                         return EOP_RESULT;
-            }
+                    }
                 } else if (inp.returnStatus == POStatus.STATUS_NULL) {
-                continue;
+                    continue;
                 } else {
                     // add this input to map.
-            Tuple inpTuple = (Tuple) inp.result;
-            keyPlan.attachInput(inpTuple);
+                    Tuple inpTuple = (Tuple) inp.result;
+                    keyPlan.attachInput(inpTuple);
 
-            // evaluate the key
-            Result keyRes = getResult(keyLeaf);
-            if (keyRes == ERR_RESULT) {
-                return ERR_RESULT;
-            }
-            Object key = keyRes.result;
-            keyPlan.detachInput();
+                    // evaluate the key
+                    Result keyRes = getResult(keyLeaf);
+                    if (keyRes == ERR_RESULT) {
+                        return ERR_RESULT;
+                    }
+                    Object key = keyRes.result;
+                    keyPlan.detachInput();
                     numRecsInRawMap += 1;
                     addKeyValToMap(rawInputMap, key, inpTuple);
 
@@ -225,7 +226,7 @@ public class POPartialAgg extends Physic
                     int mem = (int) t.getMemorySize();
                     estTotalMem += mem;
                     memLimits.addNewObjSize(mem);
-                    }
+                }
             }
             avgTupleSize = estTotalMem / estTuples;
             int totalTuples = memLimits.getCacheLimit();
@@ -235,7 +236,7 @@ public class POPartialAgg extends Physic
             LOG.info("Setting thresholds. Primary: " + firstTierThreshold + ". Secondary: " + secondTierThreshold);
         }
         estimatedMemThresholds = true;
-                    }
+    }
 
     private void checkSizeReduction() throws ExecException {
         int numBeforeReduction = numRecsInProcessedMap + numRecsInRawMap;
@@ -262,14 +263,14 @@ public class POPartialAgg extends Physic
 
     private boolean mapAggDisabled() {
         return disableMapAgg;
-                    }
+    }
 
     private boolean shouldAggregateFirstLevel() {
         if (LOG.isInfoEnabled() && numRecsInRawMap > firstTierThreshold) {
             LOG.info("Aggregating " + numRecsInRawMap + " raw records.");
         }
         return (numRecsInRawMap > firstTierThreshold);
-                    }
+    }
 
     private boolean shouldAggregateSecondLevel() {
         if (LOG.isInfoEnabled() && numRecsInProcessedMap > secondTierThreshold) {
@@ -290,21 +291,24 @@ public class POPartialAgg extends Physic
             value = new ArrayList<Tuple>();
             map.put(key, value);
         }
-       value.add(inpTuple);
-       if (value.size() >= MAX_LIST_SIZE) {
-           boolean isFirst = (map == rawInputMap);
+        value.add(inpTuple);
+        if (value.size() >= MAX_LIST_SIZE) {
+            boolean isFirst = (map == rawInputMap);
             if (LOG.isDebugEnabled()){
                 LOG.debug("The cache for key " + key + " has grown too large. Aggregating " + ((isFirst) ? "first level." : "second level."));
-           }
-           if (isFirst) {
-               aggregateRawRow(key);
-                    } else {
-               aggregateSecondLevel();
-           }
-       }
-                    }
+            }
+            if (isFirst) {
+                aggregateRawRow(key);
+            } else {
+                aggregateSecondLevel();
+            }
+        }
+    }
 
     private void startSpill() throws ExecException {
+        // If spillingIterator is null, we are already spilling and don't need to set up.
+        if (spillingIterator != null) return;
+
         if (!rawInputMap.isEmpty()) {
             if (LOG.isInfoEnabled()) {
                 LOG.info("In startSpill(), aggregating raw inputs. " + numRecsInRawMap + " tuples.");
@@ -322,14 +326,15 @@ public class POPartialAgg extends Physic
             if (LOG.isInfoEnabled()) {
                 LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples.");
             }
-                }
+        }
         doSpill = true;
         spillingIterator = processedInputMap.entrySet().iterator();
-            }
+    }
 
     private Result spillResult() throws ExecException {
         // if no more to spill, return EOP_RESULT.
         if (processedInputMap.isEmpty()) {
+            spillingIterator = null;
             LOG.info("In spillResults(), processed map is empty -- done spilling.");
             return EOP_RESULT;
         } else {
@@ -365,9 +370,9 @@ public class POPartialAgg extends Physic
             iter.remove();
             addKeyValToMap(toMap, entry.getKey(), getAggResultTuple(res.result));
             numEntriesInTarget += valueTuple.size() - 1;
-            }
-        return numEntriesInTarget;
         }
+        return numEntriesInTarget;
+    }
 
     private void aggregateFirstLevel() throws ExecException {
         numRecsInProcessedMap = aggregate(rawInputMap, processedInputMap, numRecsInProcessedMap);
@@ -436,7 +441,7 @@ public class POPartialAgg extends Physic
                     PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
             if (usage != null) {
                 percent = Float.parseFloat(usage);
-        }
+            }
         }
         return percent;
     }
@@ -536,12 +541,8 @@ public class POPartialAgg extends Physic
 
     @Override
     public long spill() {
-        try {
-            LOG.info("Spill triggered by SpillableMemoryManager");
-            startSpill();
-        } catch (ExecException e) {
-            throw new RuntimeException(e);
-        }
+        LOG.info("Spill triggered by SpillableMemoryManager");
+        doSpill = true;
         return 0;
     }