You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2012/08/30 07:10:35 UTC
svn commit: r1378801 [1/2] - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/builtin/ src/or...
Author: dvryaboy
Date: Thu Aug 30 05:10:34 2012
New Revision: 1378801
URL: http://svn.apache.org/viewvc?rev=1378801&view=rev
Log:
PIG-2888: Improve performance of POPartialAgg
Modified:
pig/trunk/CHANGES.txt
pig/trunk/build.xml
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
pig/trunk/src/org/apache/pig/builtin/Distinct.java
pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
pig/trunk/src/org/apache/pig/data/SelfSpillBag.java
pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/PigPerformanceLoader.java
pig/trunk/test/org/apache/pig/test/TestPOCast.java
pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java
pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Aug 30 05:10:34 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2888: Improve performance of POPartialAgg (dvryaboy)
+
PIG-2708: split MiniCluster based tests out of org.apache.pig.test.TestInputOutputFileValidator (analog.sony via daijy)
PIG-2890: Revert PIG-2578 (dvryaboy)
Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Thu Aug 30 05:10:34 2012
@@ -298,6 +298,8 @@
</path>
<path id="test.classpath">
+ <!-- need to put this first, otherwise junit-3 testcases can break -->
+ <pathelement location="${ivy.lib.dir}/junit-3.8.1.jar"/>
<pathelement location="${build.classes}"/>
<pathelement location="${test.src.dir}"/>
<pathelement location="contrib/piggybank/java/piggybank.jar"/>
@@ -783,6 +785,20 @@
</target>
<!-- ================================================================== -->
+ <!-- Make pigperf.jar -->
+ <!-- ================================================================== -->
+ <target name="pigperf" depends="compile-test" description="Create pigperf.jar">
+ <jar jarfile="pigperf.jar">
+ <fileset dir="${test.build.dir}/classes">
+ <include name="org/apache/pig/test/pigmix/**"/>
+ <include name="org/apache/pig/test/utils/datagen/*"/>
+ <include name="org/apache/pig/test/udf/storefunc/*"/>
+ </fileset>
+ <zipfileset src="${lib.dir}/sdsuLibJKD12.jar" />
+ </jar>
+ </target>
+
+ <!-- ================================================================== -->
<!-- Run unit tests -->
<!-- ================================================================== -->
<target name="test-core" depends="compile-test,jar-withouthadoop,debugger.check" description="Run full set of unit tests">
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Thu Aug 30 05:10:34 2012
@@ -24,8 +24,31 @@ package org.apache.pig;
public class PigConfiguration {
/**
+ * Controls the fraction of total memory that is allowed to be used by
+ * cached bags. Default is 0.2.
+ */
+ public static final String PROP_CACHEDBAG_MEMUSAGE = "pig.cachedbag.memusage";
+
+ /**
+ * Controls whether partial aggregation is turned on
+ */
+ public static final String PROP_EXEC_MAP_PARTAGG = "pig.exec.mapPartAgg";
+
+ /**
+ * Controls the minimum reduction in-mapper Partial Aggregation should achieve in order
+ * to stay on. If after a period of observation this reduction is not achieved,
+ * in-mapper aggregation will be turned off and a message logged to that effect.
+ */
+ public static final String PARTAGG_MINREDUCTION = "pig.exec.mapPartAgg.minReduction";
+
+ /**
* Controls whether execution time of Pig UDFs should be tracked.
* This feature uses counters; use judiciously.
*/
public static final String TIME_UDFS_PROP = "pig.udf.profile";
+
+ /**
+ * Turns off use of combiners in MapReduce jobs produced by Pig.
+ */
+ public static final String PROP_NO_COMBINER = "pig.exec.nocombiner";
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Aug 30 05:10:34 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapred.RunningJ
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.PigRunner.ReturnCode;
@@ -80,9 +81,6 @@ public class MapReduceLauncher extends L
public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
"mapreduce.fileoutputcommitter.marksuccessfuljobs";
- public static final String PROP_EXEC_MAP_PARTAGG = "pig.exec.mapPartAgg";
-
-
private static final Log log = LogFactory.getLog(MapReduceLauncher.class);
//used to track the exception thrown by the job control which is run in a separate thread
@@ -561,10 +559,10 @@ public class MapReduceLauncher extends L
pc.getProperties().getProperty(
"last.input.chunksize", POJoinPackage.DEFAULT_CHUNK_SIZE);
- String prop = pc.getProperties().getProperty("pig.exec.nocombiner");
+ String prop = pc.getProperties().getProperty(PigConfiguration.PROP_NO_COMBINER);
if (!pc.inIllustrator && !("true".equals(prop))) {
boolean doMapAgg =
- Boolean.valueOf(pc.getProperties().getProperty(PROP_EXEC_MAP_PARTAGG,"false"));
+ Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG,"false"));
CombinerOptimizer co = new CombinerOptimizer(plan, doMapAgg);
co.visit();
//display the warning message(s) from the CombinerOptimizer
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Thu Aug 30 05:10:34 2012
@@ -18,13 +18,15 @@
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.WeakHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -34,180 +36,148 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
-import org.apache.pig.data.DefaultDataBag;
import org.apache.pig.data.SelfSpillBag.MemoryLimits;
-import org.apache.pig.data.SizeUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import com.google.common.collect.Maps;
/**
- * Do partial aggregation in map plan. It uses a hash-map to aggregate. If
- * consecutive records have same key, it will aggregate those without adding
- * them to the hash-map. As future optimization, the use of hash-map could be
- * disabled when input data is sorted on group-by keys
+ * Do partial aggregation in map plan. Inputs are buffered up in
+ * a hashmap until a threshold is reached; then the combiner functions
+ * are fed these buffered up inputs, and results stored in a secondary
+ * map. Once that map fills up or all input has been seen, results are
+ * piped out into the next operator (caller of getNext()).
*/
public class POPartialAgg extends PhysicalOperator {
-
- public static final String PROP_PARTAGG_MINREDUCTION = "pig.exec.mapPartAgg.minReduction";
-
- private static final Log log = LogFactory.getLog(POPartialAgg.class);
+ private static final Log LOG = LogFactory.getLog(POPartialAgg.class);
private static final long serialVersionUID = 1L;
- private PhysicalPlan keyPlan;
- private ExpressionOperator keyLeaf;
-
- private List<PhysicalPlan> valuePlans;
- private List<ExpressionOperator> valueLeaves;
private static final Result ERR_RESULT = new Result();
private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP,
null);
- // run time variables
- private transient Object currentKey = null;
- private transient Map<Object, Tuple> aggMap;
- // tuple of the format - (null(key),bag-val1,bag-val2,...)
- // attach this to the plans with algebraic udf before evaluating the plans
- private transient Tuple valueTuple = null;
+ // number of records to sample to determine average size used by each
+ // entry in hash map and average seen reduction
+ private static final int NUM_RECS_TO_SAMPLE = 10000;
+
+ // We want to avoid massive ArrayList copies as they get big.
+ // Array Lists grow by prevSize + prevSize/2. Given default initial size of 10,
+ // 9369 is the size of the array after 18 such resizings. This seems like a sufficiently
+ // large value to trigger spilling/aggregation instead of paying for yet another data
+ // copy.
+ private static final int MAX_LIST_SIZE = 9368;
- private boolean isFinished = false;
+ private static final int DEFAULT_MIN_REDUCTION = 10;
- private transient Iterator<Tuple> mapDumpIterator;
- private transient int numToDump;
+ // TODO: these are temporary. The real thing should be using memory usage estimation.
+ private static final int FIRST_TIER_THRESHOLD = 20000;
+ private static final int SECOND_TIER_THRESHOLD = FIRST_TIER_THRESHOLD / DEFAULT_MIN_REDUCTION;
- // maximum bag size of currentValues cached before aggregation is done
- private static final int MAX_SIZE_CURVAL_CACHE = 1024;
+ private static final WeakHashMap<POPartialAgg, Byte> ALL_POPARTS = new WeakHashMap<POPartialAgg, Byte>();
- // number of records to sample to determine average size used by each
- // entry in hash map
- private static final int NUM_RESRECS_TO_SAMPLE_SZ_ESTIMATE = 100;
+ private static final TupleFactory TF = TupleFactory.getInstance();
+ private static final BagFactory BG = BagFactory.getInstance();
- // params for auto disabling map aggregation
- private static final int NUM_INPRECS_TO_SAMPLE_SZ_REDUCTION = 1000;
+ private PhysicalPlan keyPlan;
+ private ExpressionOperator keyLeaf;
- private static final int DEFAULT_MIN_REDUCTION = 10;
+ private List<PhysicalPlan> valuePlans;
+ private List<ExpressionOperator> valueLeaves;
+
+ private int numRecsInRawMap = 0;
+ private int numRecsInProcessedMap = 0;
+
+ private Map<Object, List<Tuple>> rawInputMap = Maps.newHashMap();
+ private Map<Object, List<Tuple>> processedInputMap = Maps.newHashMap();
private boolean disableMapAgg = false;
- private int num_inp_recs;
private boolean sizeReductionChecked = false;
+ private boolean inputsExhausted = false;
+ private boolean doSpill = false;
+ private transient MemoryLimits memLimits;
- private transient int maxHashMapSize;
+ private transient boolean initialized = false;
+ private int firstTierThreshold = FIRST_TIER_THRESHOLD;
+ private int secondTierThreshold = SECOND_TIER_THRESHOLD;
+ private int sizeReduction;
+ private Iterator<Entry<Object, List<Tuple>>> spillingIterator;
+ private boolean estimatedMemThresholds;
- private transient TupleFactory tupleFact;
- private transient MemoryLimits memLimits;
public POPartialAgg(OperatorKey k) {
super(k);
}
- @Override
- public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
- // combiner optimizer does not get invoked if the plan is being executed
- // under illustrate, so POPartialAgg should not get used in that case
- throw new UnsupportedOperationException();
+ private void init() throws ExecException {
+ ALL_POPARTS.put(this, null);
+ float percent = getPercentUsageFromProp();
+ if (percent <= 0) {
+ LOG.info("No memory allocated to intermediate memory buffers. Turning off partial aggregation.");
+ disableMapAgg();
}
-
- @Override
- public void visit(PhyPlanVisitor v) throws VisitorException {
- v.visitPartialAgg(this);
+ initialized = true;
}
@Override
- public Result getNext(Tuple t) throws ExecException {
+ public Result getNext(Tuple __ignored__) throws ExecException {
+ // accumulate tuples from processInput in rawInputMap.
+ // when the maps grow to mem limit, go over each item in map, and call
+ // combiner aggs on each collection.
+ // Store the results into processedInputMap. Clear out rawInputMap.
+ // Mem usage is updated every time we modify either of the maps.
+ // When processedInputMap is >= 20% of allotted memory, run aggs on it,
+ // and output the results as returns of successive calls of this method.
+ // Then reset processedInputMap.
+ // The fact that we are in the latter stage is communicated via the doSpill
+ // flag.
- if (disableMapAgg) {
- // map aggregation has been automatically disabled
- if (mapDumpIterator != null) {
- // there are some accumulated entries in map to be dumped
- return getNextResFromMap();
- } else {
- Result inp = processInput();
- if (disableMapAgg) {
- // the in-map partial aggregation is an optional step, just
- // like the combiner.
- // act as if this operator was never there, by just
- // returning the input
- return inp;
+ if (!initialized && !ALL_POPARTS.containsKey(this)) {
+ init();
}
- }
- }
- if (mapDumpIterator != null) {
- // if this iterator is not null, we are process of dumping records
- // from the map
- if (isFinished) {
- return getNextResFromMap();
- } else if (numToDump > 0) {
- // there are some tuples yet to be dumped, to free memory
- --numToDump;
- return getNextResFromMap();
- } else {
- mapDumpIterator = null;
+ while (true) {
+ if (!sizeReductionChecked && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
+ checkSizeReduction();
}
+ if (!estimatedMemThresholds && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
+ estimateMemThresholds();
}
-
- if (isFinished) {
- // done with dumping all records
- return new Result(POStatus.STATUS_EOP, null);
- }
-
- while (true) {
- //process each input until EOP
+ if (doSpill) {
+ Result result = spillResult();
+ if (result == EOP_RESULT) {
+ doSpill = false;
+ }
+ if (result != EOP_RESULT || inputsExhausted) {
+ return result;
+ }
+ }
+ if (mapAggDisabled()) {
+ // disableMapAgg() sets doSpill, so we can't get here while there is still contents in the buffered maps.
+ // if we get to this point, everything is flushed, so we can simply return the raw tuples from now on.
+ return processInput();
+ } else {
Result inp = processInput();
if (inp.returnStatus == POStatus.STATUS_ERR) {
- // error
return inp;
- }
- if (inp.returnStatus == POStatus.STATUS_EOP) {
+ } else if (inp.returnStatus == POStatus.STATUS_EOP) {
if (parentPlan.endOfAllInput) {
- // it is actually end of all input
- // start dumping results
- isFinished = true;
- logCapacityOfAggMap();
- // check if there was ANY input
- if (valueTuple == null) {
- return EOP_RESULT;
- }
-
- // first return agg for currentKey
- Result output = getOutput();
- aggMap.remove(currentKey);
-
- mapDumpIterator = aggMap.values().iterator();
-
- // free the variables not needed anymore
- currentKey = null;
- valueTuple = null;
-
- return output;
+ // parent input is over. flush what we have.
+ inputsExhausted = true;
+ startSpill();
+ LOG.info("Spilling last bits.");
+ continue;
} else {
- // return EOP
- return inp;
- }
+ return EOP_RESULT;
}
- if (inp.returnStatus == POStatus.STATUS_NULL) {
+ } else if (inp.returnStatus == POStatus.STATUS_NULL) {
continue;
- }
-
- // check if this operator is doing a good job of reducing the number
- // of records going to output to justify the costs of itself
- // if not , disable map partial agg
- if ((!sizeReductionChecked)) {
- checkSizeReduction();
-
- if (disableMapAgg) {
- // in-map partial aggregation just got disabled
- // return the new input record, it has not been aggregated
- return inp;
- }
- }
-
- // we have some real input data
-
- // setup input for keyplan
+ } else {
+ // add this input to map.
Tuple inpTuple = (Tuple) inp.result;
keyPlan.attachInput(inpTuple);
@@ -218,134 +188,208 @@ public class POPartialAgg extends Physic
}
Object key = keyRes.result;
keyPlan.detachInput();
+ numRecsInRawMap += 1;
+ addKeyValToMap(rawInputMap, key, inpTuple);
- if (valueTuple == null) {
- // this is the first record the operator is seeing
- // do initializations
- init(key, inpTuple);
- continue;
- } else {
- // check if key changed
- boolean keyChanged = (currentKey != null && key == null)
- || ((key != null) && (!key.equals(currentKey)));
-
- if (!keyChanged) {
- addToCurrentValues(inpTuple);
-
- // if there are enough number of values,
- // aggregate the values accumulated in valueTuple
- if (((DefaultDataBag) valueTuple.get(1)).size() >= MAX_SIZE_CURVAL_CACHE) {
- // not a key change, so store the agg result back to bag
- aggregateCurrentValues();
+ if (shouldAggregateFirstLevel()) {
+ aggregateFirstLevel();
+ }
+ if (shouldAggregateSecondLevel()) {
+ aggregateSecondLevel();
}
- continue;
- } else {// new key
+ if (shouldSpill()) {
+ LOG.info("Starting spill.");
+ startSpill(); // next time around, we'll start emitting.
+ }
+ }
+ }
+ }
+ }
- // compute aggregate for currentKey
- Result output = getOutput();
- if (output.returnStatus != POStatus.STATUS_OK) {
- return ERR_RESULT;
+ private void estimateMemThresholds() {
+ if (!mapAggDisabled()) {
+ LOG.info("Getting mem limits; considering " + ALL_POPARTS.size() + " POPArtialAgg objects.");
+
+ float percent = getPercentUsageFromProp();
+ memLimits = new MemoryLimits(ALL_POPARTS.size(), percent);
+ int estTotalMem = 0;
+ int estTuples = 0;
+ for (Map.Entry<Object, List<Tuple>> entry : rawInputMap.entrySet()) {
+ for (Tuple t : entry.getValue()) {
+ estTuples += 1;
+ int mem = (int) t.getMemorySize();
+ estTotalMem += mem;
+ memLimits.addNewObjSize(mem);
}
-
- // set new current key, value
- currentKey = key;
- resetCurrentValues();
- addToCurrentValues(inpTuple);
-
- // get existing result from map (if any) and add it to
- // current values
- Tuple existingResult = aggMap.get(key);
-
- // existingResult will be null only if key is absent in
- // aggMap
- if (existingResult != null) {
- addToCurrentValues(existingResult);
+ }
+ int totalTuples = memLimits.getCacheLimit();
+ LOG.info("Estimated total tuples to buffer, based on " + estTuples + " tuples that took up " + estTotalMem + " bytes: " + totalTuples);
+ firstTierThreshold = (int) (0.5 + totalTuples * (1f - (1f / sizeReduction)));
+ secondTierThreshold = (int) (0.5 + totalTuples * (1f / sizeReduction));
+ LOG.info("Setting thresholds. Primary: " + firstTierThreshold + ". Secondary: " + secondTierThreshold);
+ }
+ estimatedMemThresholds = true;
}
+
+ private void checkSizeReduction() throws ExecException {
+ int numBeforeReduction = numRecsInProcessedMap + numRecsInRawMap;
+ aggregateFirstLevel();
+ aggregateSecondLevel();
+ int numAfterReduction = numRecsInProcessedMap + numRecsInRawMap;
+ LOG.info("After reduction, processed map: " + numRecsInProcessedMap + "; raw map: " + numRecsInRawMap);
+ int minReduction = getMinOutputReductionFromProp();
+ LOG.info("Observed reduction factor: from " + numBeforeReduction +
+ " to " + numAfterReduction +
+ " => " + numBeforeReduction / numAfterReduction + ".");
+ if ( numBeforeReduction / numAfterReduction < minReduction) {
+ LOG.info("Disabling in-memory aggregation, since observed reduction is less than " + minReduction);
+ disableMapAgg();
+ }
+ sizeReduction = numBeforeReduction / numAfterReduction;
+ sizeReductionChecked = true;
+
+ }
+ private void disableMapAgg() throws ExecException {
+ startSpill();
+ disableMapAgg = true;
+ }
- // storing a new entry in the map, so update estimate of
- // num of entries that will fit into the map
- if (memLimits.getNumObjectsSizeAdded() < NUM_RESRECS_TO_SAMPLE_SZ_ESTIMATE) {
- updateMaxMapSize(output.result);
+ private boolean mapAggDisabled() {
+ return disableMapAgg;
}
- // check if it is time to dump some aggs from the hashmap
- if (aggMap.size() >= maxHashMapSize) {
- // dump 10% of max hash size because dumping just one
- // record at a time might result in most group key being
- // dumped (depending on hashmap implementation)
- // TODO: dump the least recently/frequently used entries
- numToDump = maxHashMapSize / 10;
- mapDumpIterator = aggMap.values().iterator();
+ private boolean shouldAggregateFirstLevel() {
+ if (LOG.isInfoEnabled() && numRecsInRawMap > firstTierThreshold) {
+ LOG.info("Aggregating " + numRecsInRawMap + " raw records.");
+ }
+ return (numRecsInRawMap > firstTierThreshold);
+ }
- return output;
+ private boolean shouldAggregateSecondLevel() {
+ if (LOG.isInfoEnabled() && numRecsInProcessedMap > secondTierThreshold) {
+ LOG.info("Aggregating " + numRecsInProcessedMap + " secondary records.");
+ }
+ return (numRecsInProcessedMap > secondTierThreshold);
+ }
+
+ private boolean shouldSpill() {
+ // is this always the same as shouldAgg?
+ return shouldAggregateSecondLevel();
+ }
+
+ private void addKeyValToMap(Map<Object, List<Tuple>> map,
+ Object key, Tuple inpTuple) throws ExecException {
+ List<Tuple> value = map.get(key);
+ if (value == null) {
+ value = new ArrayList<Tuple>();
+ map.put(key, value);
+ }
+ value.add(inpTuple);
+ if (value.size() >= MAX_LIST_SIZE) {
+ boolean isFirst = (map == rawInputMap);
+ if (LOG.isInfoEnabled()){
+ LOG.info("The cache for key " + key + " has grown too large. Aggregating " + ((isFirst) ? "first level." : "second level."));
+ }
+ if (isFirst) {
+ aggregateRawRow(key);
} else {
- // there is space available in the hashmap, store the
- // output there
- addOutputToAggMap(output);
+ aggregateSecondLevel();
+ }
+ }
}
- continue;
+ private void startSpill() throws ExecException {
+ if (!rawInputMap.isEmpty()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("In startSpill(), aggregating raw inputs. " + numRecsInRawMap + " tuples.");
+ }
+ aggregateFirstLevel();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples.");
+ }
+ }
+ if (!processedInputMap.isEmpty()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("In startSpill(), aggregating processed inputs. " + numRecsInProcessedMap + " tuples.");
+ }
+ aggregateSecondLevel();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples.");
+ }
}
+ doSpill = true;
+ spillingIterator = processedInputMap.entrySet().iterator();
}
+
+ private Result spillResult() throws ExecException {
+ // if no more to spill, return EOP_RESULT.
+ if (processedInputMap.isEmpty()) {
+ LOG.info("In spillResults(), processed map is empty -- done spilling.");
+ return EOP_RESULT;
+ } else {
+ Map.Entry<Object, List<Tuple>> entry = spillingIterator.next();
+ Tuple valueTuple = createValueTuple(entry.getKey(), entry.getValue());
+ numRecsInProcessedMap -= entry.getValue().size();
+ spillingIterator.remove();
+ Result res = getOutput(entry.getKey(), valueTuple);
+ return res;
}
}
- private void updateMaxMapSize(Object result) {
- long size = SizeUtil.getMapEntrySize(currentKey,
- result);
- memLimits.addNewObjSize(size);
- maxHashMapSize = memLimits.getCacheLimit();
+ private void aggregateRawRow(Object key) throws ExecException {
+ List<Tuple> value = rawInputMap.get(key);
+ Tuple valueTuple = createValueTuple(key, value);
+ Result res = getOutput(key, valueTuple);
+ rawInputMap.remove(key);
+ addKeyValToMap(processedInputMap, key, getAggResultTuple(res.result));
+ numRecsInProcessedMap += valueTuple.size() - 1;
}
/**
- * Aggregate values accumulated in
- *
+ * For each entry in rawInputMap, feed the list of tuples into the aggregator funcs
+ * and add the results to processedInputMap. Remove the entries from rawInputMap as we go.
* @throws ExecException
*/
- private void aggregateCurrentValues() throws ExecException {
- for (int i = 0; i < valuePlans.size(); i++) {
- valuePlans.get(i).attachInput(valueTuple);
- Result valRes = getResult(valueLeaves.get(i));
- if (valRes == ERR_RESULT) {
- throw new ExecException(
- "Error computing aggregate during in-map partial aggregation");
+ private int aggregate(Map<Object, List<Tuple>> fromMap, Map<Object, List<Tuple>> toMap, int numEntriesInTarget) throws ExecException {
+ Iterator<Map.Entry<Object, List<Tuple>>> iter = fromMap.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<Object, List<Tuple>> entry = iter.next();
+ Tuple valueTuple = createValueTuple(entry.getKey(), entry.getValue());
+ Result res = getOutput(entry.getKey(), valueTuple);
+ iter.remove();
+ addKeyValToMap(toMap, entry.getKey(), getAggResultTuple(res.result));
+ numEntriesInTarget += valueTuple.size() - 1;
}
-
- Tuple aggVal = getAggResultTuple(valRes.result);
-
- // i'th plan should read only from i'th bag
- // so we are done with i'th bag, clear it and
- // add the new agg result to it
- DataBag valBag = (DataBag) valueTuple.get(i + 1);
- valBag.clear();
- valBag.add(aggVal);
-
- valuePlans.get(i).detachInput();
+ return numEntriesInTarget;
}
+
+ private void aggregateFirstLevel() throws ExecException {
+ numRecsInProcessedMap = aggregate(rawInputMap, processedInputMap, numRecsInProcessedMap);
+ numRecsInRawMap = 0;
}
- private void init(Object key, Tuple inpTuple) throws ExecException {
- tupleFact = TupleFactory.getInstance();
+ private void aggregateSecondLevel() throws ExecException {
+ Map<Object, List<Tuple>> newMap = Maps.newHashMapWithExpectedSize(processedInputMap.size());
+ numRecsInProcessedMap = aggregate(processedInputMap, newMap, 0);
+ processedInputMap = newMap;
+ }
- // value tuple has bags of values for currentKey
- valueTuple = tupleFact.newTuple(valuePlans.size() + 1);
+ private Tuple createValueTuple(Object key, List<Tuple> inpTuples) throws ExecException {
+ Tuple valueTuple = TF.newTuple(valuePlans.size() + 1);
+ valueTuple.set(0, key);
for (int i = 0; i < valuePlans.size(); i++) {
- valueTuple.set(i + 1, new DefaultDataBag(new ArrayList<Tuple>(
- MAX_SIZE_CURVAL_CACHE)));
+ DataBag bag = BG.newDefaultBag();
+ valueTuple.set(i + 1, bag);
+ }
+ for (Tuple t : inpTuples) {
+ for (int i = 1; i < t.size(); i++) {
+ DataBag bag = (DataBag) valueTuple.get(i);
+ bag.add((Tuple) t.get(i));
+ }
}
- // set current key, add value
- currentKey = key;
- addToCurrentValues(inpTuple);
- aggMap = new HashMap<Object, Tuple>();
-
- // TODO: keep track of actual number of objects that share the
- // memory limit. For now using a default of 3, which is what is
- // used by InternalCachedBag
- memLimits = new MemoryLimits(3, -1);
- maxHashMapSize = Integer.MAX_VALUE;
-
+ return valueTuple;
}
private Tuple getAggResultTuple(Object result) throws ExecException {
@@ -357,109 +401,43 @@ public class POPartialAgg extends Physic
}
}
- private void checkSizeReduction() throws ExecException {
-
- num_inp_recs++;
- if (num_inp_recs == NUM_INPRECS_TO_SAMPLE_SZ_REDUCTION
- || (aggMap != null && aggMap.size() == maxHashMapSize - 1)) {
- // the above check for the hashmap current size is
- // done to avoid having to keep track of any dumps that
- // could
- // happen before NUM_INPRECS_TO_SAMPLE_SZ_REDUCTION is
- // reached
-
- sizeReductionChecked = true;
-
- // find out how many output records we have for this many
- // input records
-
- int outputReduction = aggMap.size() == 0 ? Integer.MAX_VALUE
- : num_inp_recs / aggMap.size();
- int min_output_reduction = getMinOutputReductionFromProp();
- if (outputReduction < min_output_reduction) {
- disableMapAgg = true;
- log.info("Disabling in-map partial aggregation because the "
- + "reduction in tuples (" + outputReduction
- + ") is lower than threshold (" + min_output_reduction
- + ")");
- logCapacityOfAggMap();
- // get current key vals output
- Result output = getOutput();
-
- // free the variables not needed anymore
- currentKey = null;
- valueTuple = null;
-
- // store the output into hash map for now
- addOutputToAggMap(output);
-
- mapDumpIterator = aggMap.values().iterator();
- }
- }
-
- }
-
- private void logCapacityOfAggMap() {
- log.info("Maximum capacity of hashmap used for map"
- + " partial aggregation was " + maxHashMapSize + " entries");
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ // combiner optimizer does not get invoked if the plan is being executed
+ // under illustrate, so POPartialAgg should not get used in that case
+ throw new UnsupportedOperationException();
}
- private void addOutputToAggMap(Result output) throws ExecException {
- aggMap.put(((Tuple) output.result).get(0), (Tuple) output.result);
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visitPartialAgg(this);
}
private int getMinOutputReductionFromProp() {
int minReduction = PigMapReduce.sJobConfInternal.get().getInt(
- PROP_PARTAGG_MINREDUCTION, 0);
-
+ PigConfiguration.PARTAGG_MINREDUCTION, DEFAULT_MIN_REDUCTION);
if (minReduction <= 0) {
- // the default minimum reduction is 10
+ LOG.info("Specified reduction is < 0 (" + minReduction + "). Using default " + DEFAULT_MIN_REDUCTION);
minReduction = DEFAULT_MIN_REDUCTION;
}
return minReduction;
}
- private Result getNextResFromMap() {
- if (!mapDumpIterator.hasNext()) {
- mapDumpIterator = null;
- return EOP_RESULT;
+ private float getPercentUsageFromProp() {
+ float percent = 0.2F;
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String usage = PigMapReduce.sJobConfInternal.get().get(
+ PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
+ if (usage != null) {
+ percent = Float.parseFloat(usage);
}
- Tuple outTuple = mapDumpIterator.next();
- mapDumpIterator.remove();
- return new Result(POStatus.STATUS_OK, outTuple);
- }
-
- private Result getOutput() throws ExecException {
- Tuple output = tupleFact.newTuple(valuePlans.size() + 1);
- output.set(0, currentKey);
-
- for (int i = 0; i < valuePlans.size(); i++) {
- valuePlans.get(i).attachInput(valueTuple);
- Result valRes = getResult(valueLeaves.get(i));
- if (valRes == ERR_RESULT) {
- return ERR_RESULT;
- }
- output.set(i + 1, valRes.result);
}
- return new Result(POStatus.STATUS_OK, output);
+ return percent;
}
- private void resetCurrentValues() throws ExecException {
- for (int i = 1; i < valueTuple.size(); i++) {
- ((DataBag) valueTuple.get(i)).clear();
- }
- }
-
- private void addToCurrentValues(Tuple inpTuple) throws ExecException {
- for (int i = 1; i < inpTuple.size(); i++) {
- DataBag bag = (DataBag) valueTuple.get(i);
- bag.add((Tuple) inpTuple.get(i));
- }
- }
private Result getResult(ExpressionOperator op) throws ExecException {
Result res = ERR_RESULT;
-
switch (op.getResultType()) {
case DataType.BAG:
case DataType.BOOLEAN:
@@ -488,6 +466,28 @@ public class POPartialAgg extends Physic
return ERR_RESULT;
}
+ /**
+ * Runs the provided key-value pair through the aggregator plans.
+ * @param key
+ * @param value
+ * @return Result, containing a tuple of form (key, tupleReturnedByPlan1, tupleReturnedByPlan2, ...)
+ * @throws ExecException
+ */
+ private Result getOutput(Object key, Tuple value) throws ExecException {
+ Tuple output = TF.newTuple(valuePlans.size() + 1);
+ output.set(0, key);
+
+ for (int i = 0; i < valuePlans.size(); i++) {
+ valuePlans.get(i).attachInput(value);
+ Result valRes = getResult(valueLeaves.get(i));
+ if (valRes == ERR_RESULT) {
+ return ERR_RESULT;
+ }
+ output.set(i + 1, valRes.result);
+ }
+ return new Result(POStatus.STATUS_OK, output);
+ }
+
@Override
public boolean supportsMultipleInputs() {
return false;
Modified: pig/trunk/src/org/apache/pig/builtin/Distinct.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/Distinct.java?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/Distinct.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/Distinct.java Thu Aug 30 05:10:34 2012
@@ -27,6 +27,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalDistinctBag;
+import org.apache.pig.data.SingleTupleBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -82,8 +83,7 @@ public class Distinct extends EvalFunc<
// representing the data we want to distinct.
// unwrap, put in a bag and send down
try {
- DataBag bag = bagFactory.newDefaultBag();
- bag.add((Tuple)input.get(0));
+ DataBag bag = new SingleTupleBag((Tuple)input.get(0));
return tupleFactory.newTuple(bag);
} catch (ExecException e) {
throw e;
Modified: pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java Thu Aug 30 05:10:34 2012
@@ -36,6 +36,7 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
@@ -81,7 +82,7 @@ public class InternalDistinctBag extends
if (percent < 0) {
percent = 0.2F;
if (PigMapReduce.sJobConfInternal.get() != null) {
- String usage = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.memusage");
+ String usage = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
if (usage != null) {
percent = Float.parseFloat(usage);
}
@@ -95,15 +96,18 @@ public class InternalDistinctBag extends
mContents = new HashSet<Tuple>();
}
+ @Override
public boolean isSorted() {
return false;
}
+ @Override
public boolean isDistinct() {
return true;
}
+ @Override
public long size() {
if (mSpillFiles != null && mSpillFiles.size() > 0){
//We need to racalculate size to guarantee a count of unique
@@ -121,6 +125,7 @@ public class InternalDistinctBag extends
}
+ @Override
public Iterator<Tuple> iterator() {
return new DistinctDataBagIterator();
}
@@ -147,6 +152,7 @@ public class InternalDistinctBag extends
}
}
+ @Override
public void addAll(DataBag b) {
Iterator<Tuple> iter = b.iterator();
while(iter.hasNext()) {
@@ -154,6 +160,7 @@ public class InternalDistinctBag extends
}
}
+ @Override
public void addAll(Collection<Tuple> c) {
Iterator<Tuple> iter = c.iterator();
while(iter.hasNext()) {
@@ -173,11 +180,13 @@ public class InternalDistinctBag extends
public Tuple tuple;
public int fileNum;
+ @Override
@SuppressWarnings("unchecked")
public int compareTo(TContainer other) {
return tuple.compareTo(other.tuple);
}
+ @Override
public boolean equals(Object obj) {
if (obj instanceof TContainer) {
return compareTo((TContainer)obj) == 0;
@@ -186,6 +195,7 @@ public class InternalDistinctBag extends
return false;
}
+ @Override
public int hashCode() {
return tuple.hashCode();
}
@@ -214,12 +224,14 @@ public class InternalDistinctBag extends
}
}
+ @Override
public boolean hasNext() {
// See if we can find a tuple. If so, buffer it.
mBuf = next();
return mBuf != null;
}
+ @Override
public Tuple next() {
// This will report progress every 1024 times through next.
// This should be much faster than using mod.
@@ -245,6 +257,7 @@ public class InternalDistinctBag extends
/**
* Not implemented.
*/
+ @Override
public void remove() {}
private Tuple readFromTree() {
@@ -464,6 +477,7 @@ public class InternalDistinctBag extends
}
}
+ @Override
public long spill(){
return proactive_spill(null);
}
Modified: pig/trunk/src/org/apache/pig/data/SelfSpillBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SelfSpillBag.java?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SelfSpillBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/SelfSpillBag.java Thu Aug 30 05:10:34 2012
@@ -17,6 +17,7 @@
*/
package org.apache.pig.data;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
@@ -51,7 +52,6 @@ public abstract class SelfSpillBag exten
@InterfaceStability.Evolving
public static class MemoryLimits {
- public static final String PROP_CACHEDBAG_MEMUSAGE = "pig.cachedbag.memusage";
private long maxMemUsage;
private int cacheLimit = Integer.MAX_VALUE;
private long memUsage = 0;
@@ -71,7 +71,7 @@ public abstract class SelfSpillBag exten
percent = 0.2F;
if (PigMapReduce.sJobConfInternal.get() != null) {
String usage = PigMapReduce.sJobConfInternal.get().get(
- PROP_CACHEDBAG_MEMUSAGE);
+ PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
if (usage != null) {
percent = Float.parseFloat(usage);
}
@@ -79,7 +79,7 @@ public abstract class SelfSpillBag exten
}
long max = Runtime.getRuntime().maxMemory();
- maxMemUsage = (long) (((float) max * percent) / (float) bagCount);
+ maxMemUsage = (long) ((max * percent) / bagCount);
// set limit to 0, if memusage is 0 or really really small.
// then all tuples are put into disk
Modified: pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/PigPerformanceLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/PigPerformanceLoader.java?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/PigPerformanceLoader.java (original)
+++ pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/PigPerformanceLoader.java Thu Aug 30 05:10:34 2012
@@ -56,13 +56,8 @@ public class PigPerformanceLoader extend
class Caster implements LoadCaster {
Utf8StorageConverter helper = new Utf8StorageConverter();
- /**
- *
- */
- public Caster() {
- // TODO Auto-generated constructor stub
- }
+ @Override
public DataBag bytesToBag(byte[] b, ResourceFieldSchema fs) throws IOException {
if (b == null) return null;
@@ -102,10 +97,12 @@ public class PigPerformanceLoader extend
return bag;
}
+ @Override
public Map<String, Object> bytesToMap(byte[] b, ResourceFieldSchema fieldSchema) throws IOException {
- throw new UnsupportedOperationException();
+ return helper.bytesToMap(b);
}
+ @Override
public Map<String, Object> bytesToMap(byte[] b) throws IOException {
if (b == null) return null;