You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/27 13:50:02 UTC
svn commit: r1642132 [5/14] - in /pig/branches/spark: ./ bin/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/datetime/convert/
contrib/piggybank/java/sr...
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java Thu Nov 27 12:49:54 2014
@@ -108,20 +108,22 @@ public class POLimit extends PhysicalOpe
default:
throw new RuntimeException("Limit requires an integer parameter");
}
- if (variableLimit <= 0)
- throw new RuntimeException("Limit requires a positive integer parameter");
+ if (variableLimit < 0)
+ throw new RuntimeException("Limit requires a zero or a positive integer parameter");
this.setLimit(variableLimit);
}
Result inp = null;
while (true) {
+ // illustrator ignore LIMIT before the post processing
+ if ((illustrator == null || illustrator.getOriginalLimit() != -1) && soFar >= mLimit) {
+ inp = RESULT_EOP;
+ break;
+ }
inp = processInput();
if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
break;
illustratorMarkup(inp.result, null, 0);
- // illustrator ignore LIMIT before the post processing
- if ((illustrator == null || illustrator.getOriginalLimit() != -1) && soFar>=mLimit)
- inp.returnStatus = POStatus.STATUS_EOP;
soFar++;
break;
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Thu Nov 27 12:49:54 2014
@@ -18,6 +18,7 @@
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.io.IOException;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -65,6 +66,9 @@ public class POLoad extends PhysicalOper
private boolean isTmpLoad;
private long limit=-1;
+
+ private transient List<String> cacheFiles = null;
+ private transient List<String> shipFiles = null;
public POLoad(OperatorKey k) {
this(k,-1, null);
@@ -252,4 +256,20 @@ public class POLoad extends PhysicalOper
public void setLimit(long limit) {
this.limit = limit;
}
+
+ public List<String> getCacheFiles() {
+ return cacheFiles;
+ }
+
+ public void setCacheFiles(List<String> cf) {
+ cacheFiles = cf;
+ }
+
+ public List<String> getShipFiles() {
+ return shipFiles;
+ }
+
+ public void setShipFiles(List<String> sf) {
+ shipFiles = sf;
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Thu Nov 27 12:49:54 2014
@@ -22,11 +22,13 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
import org.apache.pig.data.AccumulativeBag;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
@@ -75,16 +77,16 @@ public class POPackage extends PhysicalO
protected static final BagFactory mBagFactory = BagFactory.getInstance();
protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
- private boolean firstTime = true;
-
- private boolean useDefaultBag = false;
-
private boolean lastBagReadOnly = true;
protected Packager pkgr;
protected PigNullableWritable keyWritable;
+ private transient boolean initialized;
+ private transient boolean useDefaultBag;
+ private transient int accumulativeBatchSize;
+
public POPackage(OperatorKey k) {
this(k, -1, null);
}
@@ -189,15 +191,17 @@ public class POPackage extends PhysicalO
*/
@Override
public Result getNextTuple() throws ExecException {
- if(firstTime){
- firstTime = false;
+ if (!initialized) {
+ initialized = true;
if (PigMapReduce.sJobConfInternal.get() != null) {
String bagType = PigMapReduce.sJobConfInternal.get().get(
- "pig.cachedbag.type");
+ PigConfiguration.PIG_CACHEDBAG_TYPE);
if (bagType != null && bagType.equalsIgnoreCase("default")) {
useDefaultBag = true;
}
}
+ accumulativeBatchSize = AccumulatorOptimizerUtil.getAccumulativeBatchSize();
+
// If multiquery, the last bag is InternalCachedBag and should not
// set ReadOnly flag, otherwise we will materialize again to another
// InternalCachedBag
@@ -220,9 +224,7 @@ public class POPackage extends PhysicalO
// create bag wrapper to pull tuples in many batches
// all bags have reference to the sample tuples buffer
// which contains tuples from one batch
- POPackageTupleBuffer buffer = new POPackageTupleBuffer();
- buffer.setKey(key);
- buffer.setIterator(tupIter);
+ POPackageTupleBuffer buffer = new POPackageTupleBuffer(accumulativeBatchSize, key, tupIter);
for (int i = 0; i < numInputs; i++) {
dbs[i] = new AccumulativeBag(buffer, i);
}
@@ -317,29 +319,16 @@ public class POPackage extends PhysicalO
private Object currKey;
@SuppressWarnings("unchecked")
- public POPackageTupleBuffer() {
- batchSize = 20000;
- if (PigMapReduce.sJobConfInternal.get() != null) {
- String size = PigMapReduce.sJobConfInternal.get().get("pig.accumulative.batchsize");
- if (size != null) {
- batchSize = Integer.parseInt(size);
- }
- }
-
+ public POPackageTupleBuffer(int batchSize, Object key, Iterator<NullableTuple> iter) {
+ this.batchSize = batchSize;
+ this.currKey = key;
+ this.iter = iter;
this.bags = new List[numInputs];
for(int i=0; i<numInputs; i++) {
- this.bags[i] = new ArrayList<Tuple>();
+ this.bags[i] = new ArrayList<Tuple>(batchSize);
}
}
- public void setKey(Object key) {
- this.currKey = key;
- }
-
- public void setIterator(Iterator<NullableTuple> iter) {
- this.iter = iter;
- }
-
@Override
public boolean hasNextBatch() {
return iter.hasNext();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Thu Nov 27 12:49:54 2014
@@ -36,14 +36,15 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.SelfSpillBag.MemoryLimits;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.GroupingSpillable;
import org.apache.pig.impl.util.Spillable;
import org.apache.pig.impl.util.SpillableMemoryManager;
@@ -56,7 +57,7 @@ import com.google.common.collect.Maps;
* map. Once that map fills up or all input has been seen, results are
* piped out into the next operator (caller of getNext()).
*/
-public class POPartialAgg extends PhysicalOperator implements Spillable {
+public class POPartialAgg extends PhysicalOperator implements Spillable, GroupingSpillable {
private static final Log LOG = LogFactory.getLog(POPartialAgg.class);
private static final long serialVersionUID = 1L;
@@ -83,33 +84,44 @@ public class POPartialAgg extends Physic
private static final WeakHashMap<POPartialAgg, Byte> ALL_POPARTS = new WeakHashMap<POPartialAgg, Byte>();
private static final TupleFactory TF = TupleFactory.getInstance();
- private static final BagFactory BG = BagFactory.getInstance();
private PhysicalPlan keyPlan;
private ExpressionOperator keyLeaf;
-
private List<PhysicalPlan> valuePlans;
private List<ExpressionOperator> valueLeaves;
- private int numRecsInRawMap = 0;
- private int numRecsInProcessedMap = 0;
+ private transient int numRecsInRawMap;
+ private transient int numRecsInProcessedMap;
- private Map<Object, List<Tuple>> rawInputMap = Maps.newHashMap();
- private Map<Object, List<Tuple>> processedInputMap = Maps.newHashMap();
+ private transient Map<Object, List<Tuple>> rawInputMap;
+ private transient Map<Object, List<Tuple>> processedInputMap;
- private boolean disableMapAgg = false;
- private boolean sizeReductionChecked = false;
- private boolean inputsExhausted = false;
- private volatile boolean doSpill = false;
- private transient MemoryLimits memLimits;
-
- private transient boolean initialized = false;
- private int firstTierThreshold = FIRST_TIER_THRESHOLD;
- private int secondTierThreshold = SECOND_TIER_THRESHOLD;
- private int sizeReduction = 1;
- private int avgTupleSize = 0;
- private Iterator<Entry<Object, List<Tuple>>> spillingIterator;
- private boolean estimatedMemThresholds = false;
+ //Transient booleans always initialize to false
+ private transient boolean initialized;
+ private transient boolean disableMapAgg;
+ private transient boolean sizeReductionChecked;
+ private transient boolean inputsExhausted;
+ private transient boolean estimatedMemThresholds;
+ // The doSpill flag is set when spilling is running or needs to run.
+ // It is set by POPartialAgg when its buffers are full after having run aggregations and
+ // the records have to be emitted to the map output.
+ // The doContingentSpill flag is set when the SpillableMemoryManager is notified
+ // by GC that the runtime is low on memory and the SpillableMemoryManager identifies
+ // the particular buffer as a good spill candidate because it is large. The contingent spill logic tries
+ // to satisfy the memory manager's request for freeing memory by aggregating data
+ // rather than just spilling records to disk.
+ private transient volatile boolean doSpill;
+ private transient volatile boolean doContingentSpill;
+ private transient volatile Object spillLock;
+
+ private transient int minOutputReduction;
+ private transient float percentUsage;
+ private transient int numRecordsToSample;
+ private transient int firstTierThreshold;
+ private transient int secondTierThreshold;
+ private transient int sizeReduction;
+ private transient int avgTupleSize;
+ private transient Iterator<Entry<Object, List<Tuple>>> spillingIterator;
public POPartialAgg(OperatorKey k) {
@@ -118,10 +130,38 @@ public class POPartialAgg extends Physic
private void init() throws ExecException {
ALL_POPARTS.put(this, null);
- float percent = getPercentUsageFromProp();
- if (percent <= 0) {
+ numRecsInRawMap = 0;
+ numRecsInProcessedMap = 0;
+ rawInputMap = Maps.newHashMap();
+ processedInputMap = Maps.newHashMap();
+ minOutputReduction = DEFAULT_MIN_REDUCTION;
+ numRecordsToSample = NUM_RECS_TO_SAMPLE;
+ firstTierThreshold = FIRST_TIER_THRESHOLD;
+ secondTierThreshold = SECOND_TIER_THRESHOLD;
+ sizeReduction = 1;
+ avgTupleSize = 0;
+ percentUsage = 0.2F;
+ spillLock = new Object();
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String usage = PigMapReduce.sJobConfInternal.get().get(
+ PigConfiguration.PIG_CACHEDBAG_MEMUSAGE);
+ if (usage != null) {
+ percentUsage = Float.parseFloat(usage);
+ }
+ minOutputReduction = PigMapReduce.sJobConfInternal.get().getInt(
+ PigConfiguration.PIG_EXEC_MAP_PARTAGG_MINREDUCTION, DEFAULT_MIN_REDUCTION);
+ if (minOutputReduction <= 0) {
+ LOG.info("Specified reduction is < 0 (" + minOutputReduction + "). Using default " +
+ DEFAULT_MIN_REDUCTION);
+ minOutputReduction = DEFAULT_MIN_REDUCTION;
+ }
+ }
+ if (percentUsage <= 0) {
LOG.info("No memory allocated to intermediate memory buffers. Turning off partial aggregation.");
disableMapAgg();
+ // Set them to true instead of adding another check for !disableMapAgg
+ sizeReductionChecked = true;
+ estimatedMemThresholds = true;
}
initialized = true;
SpillableMemoryManager.getInstance().registerSpillable(this);
@@ -145,17 +185,36 @@ public class POPartialAgg extends Physic
}
while (true) {
- if (!sizeReductionChecked && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
+ if (!sizeReductionChecked && numRecsInRawMap >= numRecordsToSample) {
checkSizeReduction();
+ if (doContingentSpill && !doSpill) {
+ LOG.info("Avoided emitting records during spill memory call.");
+ doContingentSpill = false;
+ }
}
- if (!estimatedMemThresholds && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) {
+ if (!estimatedMemThresholds && numRecsInRawMap >= numRecordsToSample) {
estimateMemThresholds();
}
+ if (doContingentSpill) {
+ // Don't aggregate if spilling. Avoid concurrent update of spilling iterator.
+ if (doSpill == false) {
+ // SpillableMemoryManager requested a spill to reduce memory
+ // consumption. See if we can avoid it.
+ aggregateBothLevels(false, false);
+ if (shouldSpill()) {
+ startSpill(false);
+ } else {
+ LOG.info("Avoided emitting records during spill memory call.");
+ doContingentSpill = false;
+ }
+ }
+ }
if (doSpill) {
- startSpill();
+ startSpill(true);
Result result = spillResult();
if (result.returnStatus == POStatus.STATUS_EOP) {
doSpill = false;
+ doContingentSpill = false;
}
if (result.returnStatus != POStatus.STATUS_EOP
|| inputsExhausted) {
@@ -174,8 +233,8 @@ public class POPartialAgg extends Physic
if (parentPlan.endOfAllInput) {
// parent input is over. flush what we have.
inputsExhausted = true;
- startSpill();
LOG.info("Spilling last bits.");
+ startSpill(true);
continue;
} else {
return EOP_RESULT;
@@ -197,15 +256,9 @@ public class POPartialAgg extends Physic
numRecsInRawMap += 1;
addKeyValToMap(rawInputMap, key, inpTuple);
- if (shouldAggregateFirstLevel()) {
- aggregateFirstLevel();
- }
- if (shouldAggregateSecondLevel()) {
- aggregateSecondLevel();
- }
+ aggregateBothLevels(true, true);
if (shouldSpill()) {
- LOG.info("Starting spill.");
- startSpill(); // next time around, we'll start emitting.
+ startSpill(false); // next time around, we'll start emitting.
}
}
}
@@ -214,10 +267,10 @@ public class POPartialAgg extends Physic
private void estimateMemThresholds() {
if (!mapAggDisabled()) {
- LOG.info("Getting mem limits; considering " + ALL_POPARTS.size() + " POPArtialAgg objects.");
-
- float percent = getPercentUsageFromProp();
- memLimits = new MemoryLimits(ALL_POPARTS.size(), percent);
+ LOG.info("Getting mem limits; considering " + ALL_POPARTS.size()
+ + " POPArtialAgg objects." + " with memory percentage "
+ + percentUsage);
+ MemoryLimits memLimits = new MemoryLimits(ALL_POPARTS.size(), percentUsage);
int estTotalMem = 0;
int estTuples = 0;
for (Map.Entry<Object, List<Tuple>> entry : rawInputMap.entrySet()) {
@@ -234,30 +287,39 @@ public class POPartialAgg extends Physic
firstTierThreshold = (int) (0.5 + totalTuples * (1f - (1f / sizeReduction)));
secondTierThreshold = (int) (0.5 + totalTuples * (1f / sizeReduction));
LOG.info("Setting thresholds. Primary: " + firstTierThreshold + ". Secondary: " + secondTierThreshold);
+ // The second tier should at least allow one tuple before it tries to aggregate.
+ // This code retains the total number of tuples in the buffer while guaranteeing
+ // the second tier has at least one tuple.
+ if (secondTierThreshold == 0) {
+ secondTierThreshold += 1;
+ firstTierThreshold -= 1;
+ }
}
estimatedMemThresholds = true;
}
private void checkSizeReduction() throws ExecException {
- int numBeforeReduction = numRecsInProcessedMap + numRecsInRawMap;
- aggregateFirstLevel();
- aggregateSecondLevel();
- int numAfterReduction = numRecsInProcessedMap + numRecsInRawMap;
- LOG.info("After reduction, processed map: " + numRecsInProcessedMap + "; raw map: " + numRecsInRawMap);
- int minReduction = getMinOutputReductionFromProp();
- LOG.info("Observed reduction factor: from " + numBeforeReduction +
- " to " + numAfterReduction +
- " => " + numBeforeReduction / numAfterReduction + ".");
- if ( numBeforeReduction / numAfterReduction < minReduction) {
- LOG.info("Disabling in-memory aggregation, since observed reduction is less than " + minReduction);
- disableMapAgg();
+ if (!mapAggDisabled()) {
+ int numBeforeReduction = numRecsInProcessedMap + numRecsInRawMap;
+ aggregateBothLevels(false, false);
+ int numAfterReduction = numRecsInProcessedMap + numRecsInRawMap;
+ LOG.info("After reduction, processed map: " + numRecsInProcessedMap + "; raw map: " + numRecsInRawMap);
+ LOG.info("Observed reduction factor: from " + numBeforeReduction +
+ " to " + numAfterReduction +
+ " => " + numBeforeReduction / numAfterReduction + ".");
+ if ( numBeforeReduction / numAfterReduction < minOutputReduction) {
+ LOG.info("Disabling in-memory aggregation, since observed reduction is less than " + minOutputReduction);
+ disableMapAgg();
+ }
+ sizeReduction = numBeforeReduction / numAfterReduction;
+ sizeReductionChecked = true;
}
- sizeReduction = numBeforeReduction / numAfterReduction;
- sizeReductionChecked = true;
}
private void disableMapAgg() throws ExecException {
- startSpill();
+ // Do not aggregate as when disableMapAgg is called aggregation is
+ // called and size reduction checked
+ startSpill(false);
disableMapAgg = true;
}
@@ -266,16 +328,10 @@ public class POPartialAgg extends Physic
}
private boolean shouldAggregateFirstLevel() {
- if (LOG.isInfoEnabled() && numRecsInRawMap > firstTierThreshold) {
- LOG.info("Aggregating " + numRecsInRawMap + " raw records.");
- }
return (numRecsInRawMap > firstTierThreshold);
}
private boolean shouldAggregateSecondLevel() {
- if (LOG.isInfoEnabled() && numRecsInProcessedMap > secondTierThreshold) {
- LOG.info("Aggregating " + numRecsInProcessedMap + " secondary records.");
- }
return (numRecsInProcessedMap > secondTierThreshold);
}
@@ -305,27 +361,13 @@ public class POPartialAgg extends Physic
}
}
- private void startSpill() throws ExecException {
+ private void startSpill(boolean aggregate) throws ExecException {
// If spillingIterator is null, we are already spilling and don't need to set up.
if (spillingIterator != null) return;
- if (!rawInputMap.isEmpty()) {
- if (LOG.isInfoEnabled()) {
- LOG.info("In startSpill(), aggregating raw inputs. " + numRecsInRawMap + " tuples.");
- }
- aggregateFirstLevel();
- if (LOG.isInfoEnabled()) {
- LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples.");
- }
- }
- if (!processedInputMap.isEmpty()) {
- if (LOG.isInfoEnabled()) {
- LOG.info("In startSpill(), aggregating processed inputs. " + numRecsInProcessedMap + " tuples.");
- }
- aggregateSecondLevel();
- if (LOG.isInfoEnabled()) {
- LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples.");
- }
+ LOG.info("Starting spill.");
+ if (aggregate) {
+ aggregateBothLevels(false, true);
}
doSpill = true;
spillingIterator = processedInputMap.entrySet().iterator();
@@ -374,15 +416,41 @@ public class POPartialAgg extends Physic
return numEntriesInTarget;
}
+ private void aggregateBothLevels(boolean checkThresholdForFirst,
+ boolean checkThresholdForSecond) throws ExecException {
+ // When processed map is initially empty, just aggregate first level as
+ // aggregating second level immediately would not yield anything
+ boolean aggregateSecondLevel = !processedInputMap.isEmpty();
+ if (!checkThresholdForFirst || shouldAggregateFirstLevel()) {
+ aggregateFirstLevel();
+ }
+ if (aggregateSecondLevel && (!checkThresholdForSecond || shouldAggregateSecondLevel())) {
+ aggregateSecondLevel();
+ }
+ }
+
private void aggregateFirstLevel() throws ExecException {
+ if (rawInputMap.isEmpty()) {
+ return;
+ }
+ int rawTuples = numRecsInRawMap;
+ int processedTuples = numRecsInProcessedMap;
numRecsInProcessedMap = aggregate(rawInputMap, processedInputMap, numRecsInProcessedMap);
numRecsInRawMap = 0;
+ LOG.info("Aggregated " + rawTuples+ " raw tuples."
+ + " Processed tuples before aggregation = " + processedTuples
+ + ", after aggregation = " + numRecsInProcessedMap);
}
private void aggregateSecondLevel() throws ExecException {
+ if (processedInputMap.isEmpty()) {
+ return;
+ }
+ int processedTuples = numRecsInProcessedMap;
Map<Object, List<Tuple>> newMap = Maps.newHashMapWithExpectedSize(processedInputMap.size());
numRecsInProcessedMap = aggregate(processedInputMap, newMap, 0);
processedInputMap = newMap;
+ LOG.info("Aggregated " + processedTuples + " processed tuples to " + numRecsInProcessedMap + " tuples");
}
private Tuple createValueTuple(Object key, List<Tuple> inpTuples) throws ExecException {
@@ -390,7 +458,14 @@ public class POPartialAgg extends Physic
valueTuple.set(0, key);
for (int i = 0; i < valuePlans.size(); i++) {
- DataBag bag = BG.newDefaultBag();
+ DataBag bag = null;
+ if (doContingentSpill) {
+ // Don't use additional memory since we already have memory stress
+ bag = new InternalCachedBag();
+ } else {
+ // Take 10% of memory, need fine tune later
+ bag = new InternalCachedBag(1, 0.1F);
+ }
valueTuple.set(i + 1, bag);
}
for (Tuple t : inpTuples) {
@@ -424,29 +499,6 @@ public class POPartialAgg extends Physic
v.visitPartialAgg(this);
}
- private int getMinOutputReductionFromProp() {
- int minReduction = PigMapReduce.sJobConfInternal.get().getInt(
- PigConfiguration.PARTAGG_MINREDUCTION, DEFAULT_MIN_REDUCTION);
- if (minReduction <= 0) {
- LOG.info("Specified reduction is < 0 (" + minReduction + "). Using default " + DEFAULT_MIN_REDUCTION);
- minReduction = DEFAULT_MIN_REDUCTION;
- }
- return minReduction;
- }
-
- private float getPercentUsageFromProp() {
- float percent = 0.2F;
- if (PigMapReduce.sJobConfInternal.get() != null) {
- String usage = PigMapReduce.sJobConfInternal.get().get(
- PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
- if (usage != null) {
- percent = Float.parseFloat(usage);
- }
- }
- return percent;
- }
-
-
private Result getResult(ExpressionOperator op) throws ExecException {
Result res;
switch (op.getResultType()) {
@@ -536,9 +588,26 @@ public class POPartialAgg extends Physic
@Override
public long spill() {
- LOG.info("Spill triggered by SpillableMemoryManager");
- doSpill = true;
- return 0;
+ if (mapAggDisabled()) {
+ return 0;
+ } else {
+ LOG.info("Spill triggered by SpillableMemoryManager");
+ doContingentSpill = true;
+ synchronized(spillLock) {
+ if (!sizeReductionChecked) {
+ numRecordsToSample = numRecsInRawMap;
+ }
+ try {
+ while (doContingentSpill == true) {
+ Thread.sleep(50); //Keeping it on the lower side for now. Tune later
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted exception while waiting for spill to finish", e);
+ }
+ LOG.info("Finished spill for SpillableMemoryManager call");
+ return 1;
+ }
+ }
}
@Override
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Thu Nov 27 12:49:54 2014
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -77,7 +78,10 @@ public class POSort extends PhysicalOper
private long limit;
public boolean isUDFComparatorUsed = false;
private DataBag sortedBag;
- transient Iterator<Tuple> it;
+
+ private transient Iterator<Tuple> it;
+ private transient boolean initialized;
+ private transient boolean useDefaultBag;
public POSort(
OperatorKey k,
@@ -256,17 +260,19 @@ public class POSort extends PhysicalOper
if (!inputsAccumulated) {
res = processInput();
+ if (!initialized) {
+ initialized = true;
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_SORT_TYPE);
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ useDefaultBag = true;
+ }
+ }
+ }
// by default, we create InternalSortedBag, unless user configures
- // explicitly to use old bag
- String bagType = null;
- if (PigMapReduce.sJobConfInternal.get() != null) {
- bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.sort.type");
- }
- if (bagType != null && bagType.equalsIgnoreCase("default")) {
- sortedBag = BagFactory.getInstance().newSortedBag(mComparator);
- } else {
- sortedBag = new InternalSortedBag(3, mComparator);
- }
+ // explicitly to use old bag
+ sortedBag = useDefaultBag ? BagFactory.getInstance().newSortedBag(mComparator)
+ : new InternalSortedBag(3, mComparator);
while (res.returnStatus != POStatus.STATUS_EOP) {
if (res.returnStatus == POStatus.STATUS_ERR) {
@@ -377,6 +383,7 @@ public class POSort extends PhysicalOper
}
+ @Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
if(illustrator != null) {
illustrator.getEquivalenceClasses().get(eqClassIndex).add((Tuple) in);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Thu Nov 27 12:49:54 2014
@@ -81,6 +81,9 @@ public class POStore extends PhysicalOpe
private String signature;
+ private transient List<String> cacheFiles = null;
+ private transient List<String> shipFiles = null;
+
public POStore(OperatorKey k) {
this(k, -1, null);
}
@@ -313,4 +316,20 @@ public class POStore extends PhysicalOpe
public void setStoreFunc(StoreFuncInterface storeFunc) {
this.storer = storeFunc;
}
+
+ public List<String> getCacheFiles() {
+ return cacheFiles;
+ }
+
+ public void setCacheFiles(List<String> cf) {
+ cacheFiles = cf;
+ }
+
+ public List<String> getShipFiles() {
+ return shipFiles;
+ }
+
+ public void setShipFiles(List<String> sf) {
+ shipFiles = sf;
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Thu Nov 27 12:49:54 2014
@@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -92,8 +93,8 @@ public class Packager implements Illustr
private PackageType pkgType;
- boolean firstTime = true;
- boolean useDefaultBag = false;
+ private transient boolean initialized;
+ private transient boolean useDefaultBag;
protected POPackage parent = null;
@@ -473,10 +474,10 @@ public class Packager implements Illustr
}
public void checkBagType() {
- if(firstTime){
- firstTime = false;
+ if(!initialized){
+ initialized = true;
if (PigMapReduce.sJobConfInternal.get() != null) {
- String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+ String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_TYPE);
if (bagType != null && bagType.equalsIgnoreCase("default")) {
useDefaultBag = true;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Nov 27 12:49:54 2014
@@ -19,16 +19,20 @@
package org.apache.pig.backend.hadoop.executionengine.tez;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparable;
@@ -89,11 +93,26 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezPOPackageAnnotator.LoRearrangeDiscoverer;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPOPackageAnnotator.LoRearrangeDiscoverer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POIdentityInOutTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POShuffleTezLoad;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueInputTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PartitionerDefinedVertexManager;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigOutputFormatTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
@@ -102,6 +121,7 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
@@ -135,6 +155,9 @@ import org.apache.tez.runtime.library.ap
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput;
+import org.apache.tez.runtime.library.input.UnorderedKVInput;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
/**
* A visitor to construct DAG out of Tez plan.
@@ -146,6 +169,7 @@ public class TezDagBuilder extends TezOp
private Map<String, LocalResource> localResources;
private PigContext pc;
private Configuration globalConf;
+ private long intermediateTaskInputSize;
public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
Map<String, LocalResource> localResources) {
@@ -162,6 +186,19 @@ public class TezDagBuilder extends TezOp
} catch (IOException e) {
throw new RuntimeException("Error while fetching delegation tokens", e);
}
+
+ try {
+ intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(FileSystem.get(globalConf), FileLocalizer.getTemporaryResourcePath(pc));
+ } catch (Exception e) {
+ log.warn("Unable to get the block size for temporary directory, defaulting to 128MB", e);
+ intermediateTaskInputSize = 134217728L;
+ }
+ // At least 128MB. Else we will end up with too many tasks
+ intermediateTaskInputSize = Math.max(intermediateTaskInputSize, 134217728L);
+ intermediateTaskInputSize = Math.min(intermediateTaskInputSize,
+ globalConf.getLong(
+ InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
}
@Override
@@ -173,8 +210,7 @@ public class TezDagBuilder extends TezOp
Vertex to = null;
try {
if (!tezOp.isVertexGroup()) {
- boolean isMap = (predecessors == null || predecessors.isEmpty()) ? true : false;
- to = newVertex(tezOp, isMap);
+ to = newVertex(tezOp);
dag.addVertex(to);
} else {
// For union, we construct VertexGroup after iterating the
@@ -248,7 +284,8 @@ public class TezDagBuilder extends TezOp
}
return GroupInputEdge.create(from, to, edgeProperty,
- InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()));
+ InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload())
+ .setHistoryText(edgeProperty.getEdgeDestination().getHistoryText()));
}
/**
@@ -339,8 +376,9 @@ public class TezDagBuilder extends TezOp
MRToTezHelper.processMRSettings(conf, globalConf);
- in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
- out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+ String historyString = convertToHistoryText("", conf);
+ in.setUserPayload(TezUtils.createUserPayloadFromConf(conf)).setHistoryText(historyString);
+ out.setUserPayload(TezUtils.createUserPayloadFromConf(conf)).setHistoryText(historyString);
if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
// Use custom edge
@@ -378,7 +416,7 @@ public class TezDagBuilder extends TezOp
.serialize(new byte[] { combRearrange.getKeyType() }));
}
- private Vertex newVertex(TezOperator tezOp, boolean isMap) throws IOException,
+ private Vertex newVertex(TezOperator tezOp) throws IOException,
ClassNotFoundException, InterruptedException {
ProcessorDescriptor procDesc = ProcessorDescriptor.create(
tezOp.getProcessorName());
@@ -395,12 +433,24 @@ public class TezDagBuilder extends TezOp
Job job = new Job(payloadConf);
payloadConf = (JobConf) job.getConfiguration();
- if (tezOp.sampleOperator != null) {
- payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.sampleOperator.getOperatorKey().toString());
+ if (tezOp.getSampleOperator() != null) {
+ payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.getSampleOperator().getOperatorKey().toString());
}
- if (tezOp.sortOperator != null) {
- payloadConf.set(PigProcessor.SORT_VERTEX, tezOp.sortOperator.getOperatorKey().toString());
+ if (tezOp.getSortOperator() != null) {
+ // Required by Sample Aggregation job for estimating quantiles
+ payloadConf.set(PigProcessor.SORT_VERTEX, tezOp.getSortOperator().getOperatorKey().toString());
+ // PIG-4162: Order by/Skew Join in intermediate stage.
+ // Increasing order by parallelism may not be required as it is
+ // usually followed by limit other than store. But would benefit
+ // cases like skewed join followed by group by.
+ if (tezOp.getSortOperator().getEstimatedParallelism() != -1
+ && TezCompilerUtil.isIntermediateReducer(tezOp.getSortOperator())) {
+ payloadConf.setLong(
+ InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ intermediateTaskInputSize);
+ }
+
}
payloadConf.set("pig.inputs", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp()));
@@ -443,8 +493,7 @@ public class TezDagBuilder extends TezOp
tezOp.plan.remove(pack);
payloadConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
setIntermediateOutputKeyValue(keyType, payloadConf, tezOp);
- POShuffleTezLoad newPack;
- newPack = new POShuffleTezLoad(pack);
+ POShuffleTezLoad newPack = new POShuffleTezLoad(pack);
if (tezOp.isSkewedJoin()) {
newPack.setSkewedJoins(true);
}
@@ -455,7 +504,7 @@ public class TezDagBuilder extends TezOp
// backend.
Map<Integer, String> localRearrangeMap = new TreeMap<Integer, String>();
for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
- if (tezOp.sampleOperator != null && tezOp.sampleOperator == pred) {
+ if (tezOp.getSampleOperator() != null && tezOp.getSampleOperator() == pred) {
// skip sample vertex input
} else {
String inputKey = pred.getOperatorKey().toString();
@@ -511,7 +560,7 @@ public class TezDagBuilder extends TezOp
}
}
}
- JobControlCompiler.setOutputFormat(job);
+ setOutputFormat(job);
// set parent plan in all operators. currently the parent plan is really
// used only when POStream, POSplit are present in the plan
@@ -546,19 +595,22 @@ public class TezDagBuilder extends TezOp
}
// set various parallelism into the job conf for later analysis, PIG-2779
- payloadConf.setInt("pig.info.reducers.default.parallel", pc.defaultParallel);
- payloadConf.setInt("pig.info.reducers.requested.parallel", tezOp.getRequestedParallelism());
- payloadConf.setInt("pig.info.reducers.estimated.parallel", tezOp.getEstimatedParallelism());
+ payloadConf.setInt(PigImplConstants.REDUCER_DEFAULT_PARALLELISM, pc.defaultParallel);
+ payloadConf.setInt(PigImplConstants.REDUCER_REQUESTED_PARALLELISM, tezOp.getRequestedParallelism());
+ payloadConf.setInt(PigImplConstants.REDUCER_ESTIMATED_PARALLELISM, tezOp.getEstimatedParallelism());
+
+ TezScriptState ss = TezScriptState.get();
+ ss.addVertexSettingsToConf(dag.getName(), tezOp, payloadConf);
// Take our assembled configuration and create a vertex
UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
- procDesc.setUserPayload(userPayload);
+ procDesc.setUserPayload(userPayload).setHistoryText(convertToHistoryText(tezOp.getOperatorKey().toString(), payloadConf));
Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(), procDesc, tezOp.getVertexParallelism(),
- isMap ? MRHelpers.getResourceForMRMapper(globalConf) : MRHelpers.getResourceForMRReducer(globalConf));
+ tezOp.isUseMRMapSettings() ? MRHelpers.getResourceForMRMapper(globalConf) : MRHelpers.getResourceForMRReducer(globalConf));
Map<String, String> taskEnv = new HashMap<String, String>();
- MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, taskEnv, isMap);
+ MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, taskEnv, tezOp.isUseMRMapSettings());
vertex.setTaskEnvironment(taskEnv);
// All these classes are @InterfaceAudience.Private in Hadoop. Switch to Tez methods in TEZ-1012
@@ -571,7 +623,7 @@ public class TezDagBuilder extends TezOp
MRApps.setupDistributedCache(globalConf, localResources);
vertex.addTaskLocalFiles(localResources);
- vertex.setTaskLaunchCmdOpts(isMap ? MRHelpers.getJavaOptsForMRMapper(globalConf)
+ vertex.setTaskLaunchCmdOpts(tezOp.isUseMRMapSettings() ? MRHelpers.getJavaOptsForMRMapper(globalConf)
: MRHelpers.getJavaOptsForMRReducer(globalConf));
log.info("For vertex - " + tezOp.getOperatorKey().toString()
@@ -591,7 +643,8 @@ public class TezDagBuilder extends TezOp
DataSourceDescriptor.create(InputDescriptor.create(MRInput.class.getName())
.setUserPayload(UserPayload.create(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
.setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf))
- .setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteString().asReadOnlyByteBuffer())),
+ .setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteString().asReadOnlyByteBuffer()))
+ .setHistoryText(convertToHistoryText("", payloadConf)),
InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()), dag.getCredentials()));
}
@@ -609,7 +662,8 @@ public class TezDagBuilder extends TezOp
OutputDescriptor storeOutDescriptor = OutputDescriptor.create(
MROutput.class.getName()).setUserPayload(TezUtils
- .createUserPayloadFromConf(outputPayLoad));
+ .createUserPayloadFromConf(outputPayLoad))
+ .setHistoryText(convertToHistoryText("", outputPayLoad));
if (tezOp.getVertexGroupStores() != null) {
OperatorKey vertexGroupKey = tezOp.getVertexGroupStores().get(store.getOperatorKey());
if (vertexGroupKey != null) {
@@ -632,14 +686,16 @@ public class TezDagBuilder extends TezOp
new PigOutputFormat().checkOutputSpecs(job);
}
+ String vmPluginName = null;
+ Configuration vmPluginConf = null;
+
// Set the right VertexManagerPlugin
if (tezOp.getEstimatedParallelism() != -1) {
if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
// Set VertexManagerPlugin to PartitionerDefinedVertexManager, which is able
// to decrease/increase parallelism of sorting vertex dynamically
// based on the numQuantiles calculated by sample aggregation vertex
- vertex.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
- PartitionerDefinedVertexManager.class.getName()));
+ vmPluginName = PartitionerDefinedVertexManager.class.getName();
log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
} else {
boolean containScatterGather = false;
@@ -655,24 +711,50 @@ public class TezDagBuilder extends TezOp
if (containScatterGather && !containCustomPartitioner) {
// Use auto-parallelism feature of ShuffleVertexManager to dynamically
// reduce the parallelism of the vertex
- VertexManagerPluginDescriptor vmPluginDescriptor = VertexManagerPluginDescriptor.create(
- ShuffleVertexManager.class.getName());
- Configuration vmPluginConf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
+ vmPluginName = ShuffleVertexManager.class.getName();
+ vmPluginConf = (vmPluginConf == null) ? ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
- if (vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
- InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)!=
+ if (stores.size() <= 0) {
+ // Intermediate reduce. Set the bytes per reducer to be block size.
+ vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+ intermediateTaskInputSize);
+ } else if (vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) !=
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) {
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
}
- vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf));
- vertex.setVertexManagerPlugin(vmPluginDescriptor);
log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString());
}
}
}
-
+ if (tezOp.isLimit() && (vmPluginName == null || vmPluginName.equals(ShuffleVertexManager.class.getName()))) {
+ if (tezOp.inEdges.values().iterator().next().inputClassName.equals(UnorderedKVInput.class.getName())) {
+ // Setting SRC_FRACTION to 0.00001 so that even if there are 100K source tasks,
+ // limit job starts when 1 source task finishes.
+ // If limit is part of a group by or join because their parallelism is 1,
+ // we should leave the configuration with the defaults.
+ vmPluginName = ShuffleVertexManager.class.getName();
+ vmPluginConf = (vmPluginConf == null) ? ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
+ vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, "0.00001");
+ vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, "0.00001");
+ log.info("Set " + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION + " to 0.00001 for limit vertex " + tezOp.getOperatorKey().toString());
+ }
+ }
+ // else if(tezOp.isLimitAfterSort())
+ // TODO: PIG-4049 If standalone Limit we need a new VertexManager or new input
+ // instead of ShuffledMergedInput. For limit part of the sort (order by parallel 1) itself
+ // need to enhance PartitionerDefinedVertexManager
+
+ if (vmPluginName != null) {
+ VertexManagerPluginDescriptor vmPluginDescriptor = VertexManagerPluginDescriptor.create(vmPluginName);
+ if (vmPluginConf != null) {
+ vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf))
+ .setHistoryText(convertToHistoryText(vmPluginName, vmPluginConf));
+ }
+ vertex.setVertexManagerPlugin(vmPluginDescriptor);
+ }
// Reset udfcontext jobconf. It is not supposed to be set in the front end
UDFContext.getUDFContext().addJobConf(null);
return vertex;
@@ -943,4 +1025,48 @@ public class TezDagBuilder extends TezOp
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS,
comparatorClass);
}
+
+ private void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
+ // the OutputFormat we report to Hadoop is always PigOutputFormat which
+ // can be wrapped with LazyOutputFormat provided if it is supported by
+ // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
+ if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
+ try {
+ Class<?> clazz = PigContext
+ .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
+ Method method = clazz.getMethod("setOutputFormatClass",
+ org.apache.hadoop.mapreduce.Job.class, Class.class);
+ method.invoke(null, job, PigOutputFormatTez.class);
+ } catch (Exception e) {
+ job.setOutputFormatClass(PigOutputFormatTez.class);
+ log.warn(PigConfiguration.PIG_OUTPUT_LAZY
+ + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
+ }
+ } else {
+ job.setOutputFormatClass(PigOutputFormatTez.class);
+ }
+ }
+
+ // Borrowed from TezUtils.convertToHistoryText since it is not part of Tez 0.5.2
+ public static String convertToHistoryText(String description, Configuration conf) throws IOException {
+ // Add a version if this serialization is changed
+ JSONObject jsonObject = new JSONObject();
+ try {
+ if (description != null && !description.isEmpty()) {
+ jsonObject.put("desc", description);
+ }
+ if (conf != null) {
+ JSONObject confJson = new JSONObject();
+ Iterator<Entry<String, String>> iter = conf.iterator();
+ while (iter.hasNext()) {
+ Entry<String, String> entry = iter.next();
+ confJson.put(entry.getKey(), entry.getValue());
+ }
+ jsonObject.put("config", confJson);
+ }
+ } catch (JSONException e) {
+ throw new IOException("Error when trying to convert description/conf to JSON", e);
+ }
+ return jsonObject.toString();
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java Thu Nov 27 12:49:54 2014
@@ -24,8 +24,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.impl.PigContext;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
import org.apache.pig.tools.pigstats.tez.TezScriptState;
-import org.apache.pig.tools.pigstats.tez.TezStats;
public class TezExecutionEngine extends HExecutionEngine {
@@ -43,6 +43,6 @@ public class TezExecutionEngine extends
@Override
public PigStats instantiatePigStats() {
- return new TezStats(pigContext);
+ return new TezPigScriptStats(pigContext);
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Thu Nov 27 12:49:54 2014
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
@@ -32,14 +31,13 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.pig.PigConfiguration;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
import org.apache.tez.client.TezClient;
-import org.apache.tez.common.counters.CounterGroup;
-import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.Progress;
@@ -64,19 +62,39 @@ public class TezJob implements Runnable
private TezClient tezClient;
private boolean reuseSession;
private TezCounters dagCounters;
- // Vertex, CounterGroup, Counter, Value
- private Map<String, Map<String, Map<String, Long>>> vertexCounters;
+
// Timer for DAG status reporter
private Timer timer;
+ private TezJobConfig tezJobConf;
+ private TezPigScriptStats pigStats;
- public TezJob(TezConfiguration conf, DAG dag, Map<String, LocalResource> requestAMResources)
- throws IOException {
+ public TezJob(TezConfiguration conf, DAG dag,
+ Map<String, LocalResource> requestAMResources,
+ int estimatedTotalParallelism) throws IOException {
this.conf = conf;
this.dag = dag;
this.requestAMResources = requestAMResources;
- this.reuseSession = conf.getBoolean(PigConfiguration.TEZ_SESSION_REUSE, true);
+ this.reuseSession = conf.getBoolean(PigConfiguration.PIG_TEZ_SESSION_REUSE, true);
this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
- this.vertexCounters = Maps.newHashMap();
+ tezJobConf = new TezJobConfig(estimatedTotalParallelism);
+ }
+
+ static class TezJobConfig {
+
+ private int estimatedTotalParallelism = -1;
+
+ public TezJobConfig(int estimatedTotalParallelism) {
+ this.estimatedTotalParallelism = estimatedTotalParallelism;
+ }
+
+ public int getEstimatedTotalParallelism() {
+ return estimatedTotalParallelism;
+ }
+
+ public void setEstimatedTotalParallelism(int estimatedTotalParallelism) {
+ this.estimatedTotalParallelism = estimatedTotalParallelism;
+ }
+
}
public DAG getDAG() {
@@ -84,7 +102,7 @@ public class TezJob implements Runnable
}
public String getName() {
- return dag == null ? "" : dag.getName();
+ return dag.getName();
}
public Configuration getConfiguration() {
@@ -103,14 +121,6 @@ public class TezJob implements Runnable
return dagCounters;
}
- public Map<String, Map<String, Long>> getVertexCounters(String group) {
- return vertexCounters.get(group);
- }
-
- public Map<String, Long> getVertexCounters(String group, String name) {
- return vertexCounters.get(group).get(name);
- }
-
public float getDAGProgress() {
Progress p = dagStatus.getDAGProgress();
return p == null ? 0 : (float)p.getSucceededTaskCount() / (float)p.getTotalTaskCount();
@@ -126,10 +136,28 @@ public class TezJob implements Runnable
return vertexProgress;
}
+ public VertexStatus getVertexStatus(String vertexName) {
+ VertexStatus vs = null;
+ try {
+ vs = dagClient.getVertexStatus(vertexName, statusGetOpts);
+ } catch (Exception e) {
+ // Don't fail the job even if vertex status couldn't
+ // be retrieved.
+ log.warn("Cannot retrieve status for vertex " + vertexName, e);
+ }
+ return vs;
+ }
+
+ public void setPigStats(TezPigScriptStats pigStats) {
+ this.pigStats = pigStats;
+ }
+
@Override
public void run() {
+ UDFContext udfContext = UDFContext.getUDFContext();
try {
- tezClient = TezSessionManager.getClient(conf, requestAMResources, dag.getCredentials());
+ tezClient = TezSessionManager.getClient(conf, requestAMResources,
+ dag.getCredentials(), tezJobConf);
log.info("Submitting DAG " + dag.getName());
dagClient = tezClient.submitDAG(dag);
appId = tezClient.getAppMasterApplicationId();
@@ -145,7 +173,7 @@ public class TezJob implements Runnable
timer = new Timer();
timer.schedule(new DAGStatusReporter(), 1000, conf.getLong(
- PigConfiguration.TEZ_DAG_STATUS_REPORT_INTERVAL, 10) * 1000);
+ PigConfiguration.PIG_TEZ_DAG_STATUS_REPORT_INTERVAL, 20) * 1000);
while (true) {
try {
@@ -156,10 +184,18 @@ public class TezJob implements Runnable
}
if (dagStatus.isCompleted()) {
+ // For tez_local mode where PigProcessor destroys all UDFContext
+ UDFContext.setUdfContext(udfContext);
+
+ log.info("DAG Status: " + dagStatus);
dagCounters = dagStatus.getDAGCounters();
- collectVertexCounters();
TezSessionManager.freeSession(tezClient);
try {
+ pigStats.accumulateStats(this);
+ } catch (Exception e) {
+ log.warn("Exception while gathering stats", e);
+ }
+ try {
if (!reuseSession) {
TezSessionManager.stopSession(tezClient);
}
@@ -182,36 +218,17 @@ public class TezJob implements Runnable
}
private class DAGStatusReporter extends TimerTask {
+
+ private final String LINE_SEPARATOR = System.getProperty("line.separator");
+
@Override
public void run() {
- log.info("DAG Status: " + dagStatus);
- }
- }
-
- private void collectVertexCounters() {
- for (Vertex v : dag.getVertices()) {
- String name = v.getName();
- try {
- VertexStatus s = dagClient.getVertexStatus(name, statusGetOpts);
- TezCounters counters = s.getVertexCounters();
- Map<String, Map<String, Long>> grpCounters = Maps.newHashMap();
- Iterator<CounterGroup> grpIt = counters.iterator();
- while (grpIt.hasNext()) {
- CounterGroup grp = grpIt.next();
- Iterator<TezCounter> cntIt = grp.iterator();
- Map<String, Long> cntMap = Maps.newHashMap();
- while (cntIt.hasNext()) {
- TezCounter cnt = cntIt.next();
- cntMap.put(cnt.getName(), cnt.getValue());
- }
- grpCounters.put(grp.getName(), cntMap);
- }
- vertexCounters.put(name, grpCounters);
- } catch (Exception e) {
- // Don't fail the job even if vertex counters couldn't
- // be retrieved.
- log.info("Cannot retrieve counters for vertex " + name, e);
- }
+ if (dagStatus == null) return;
+ String msg = "status=" + dagStatus.getState()
+ + ", progress=" + dagStatus.getDAGProgress()
+ + ", diagnostics="
+ + StringUtils.join(dagStatus.getDiagnostics(), LINE_SEPARATOR);
+ log.info("DAG Status: " + msg);
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Thu Nov 27 12:49:54 2014
@@ -32,6 +32,9 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
import org.apache.pig.impl.PigContext;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
@@ -43,7 +46,6 @@ import org.apache.tez.dag.api.TezConfigu
*/
public class TezJobCompiler {
private static final Log log = LogFactory.getLog(TezJobCompiler.class);
- private static int dagIdentifier = 0;
private PigContext pigContext;
private TezConfiguration tezConf;
@@ -53,24 +55,22 @@ public class TezJobCompiler {
this.tezConf = new TezConfiguration(conf);
}
- public DAG buildDAG(TezOperPlan tezPlan, Map<String, LocalResource> localResources)
+ public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String, LocalResource> localResources)
throws IOException, YarnException {
- String jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME, "pig");
- DAG tezDag = DAG.create(jobName + "-" + dagIdentifier);
- dagIdentifier++;
+ DAG tezDag = DAG.create(tezPlanNode.getOperatorKey().toString());
tezDag.setCredentials(new Credentials());
- TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlan, tezDag, localResources);
+ TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlanNode.getTezOperPlan(), tezDag, localResources);
dagBuilder.visit();
return tezDag;
}
- public TezJob compile(TezOperPlan tezPlan, String grpName, TezPlanContainer planContainer)
+ public TezJob compile(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer)
throws JobCreationException {
TezJob job = null;
try {
// A single Tez job always pack only 1 Tez plan. We will track
// Tez job asynchronously to exploit parallel execution opportunities.
- job = getJob(tezPlan, planContainer);
+ job = getJob(tezPlanNode, planContainer);
} catch (JobCreationException jce) {
throw jce;
} catch(Exception e) {
@@ -82,11 +82,12 @@ public class TezJobCompiler {
return job;
}
- private TezJob getJob(TezOperPlan tezPlan, TezPlanContainer planContainer)
+ private TezJob getJob(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer)
throws JobCreationException {
try {
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
localResources.putAll(planContainer.getLocalResources());
+ TezOperPlan tezPlan = tezPlanNode.getTezOperPlan();
localResources.putAll(tezPlan.getExtraResources());
String shipFiles = pigContext.getProperties().getProperty("pig.streaming.ship.files");
if (shipFiles != null) {
@@ -101,8 +102,11 @@ public class TezJobCompiler {
TezResourceManager.getInstance().addTezResource(new Path(new URI(file.trim())).toUri());
}
}
- DAG tezDag = buildDAG(tezPlan, localResources);
- return new TezJob(tezConf, tezDag, localResources);
+ for (Map.Entry<String, LocalResource> entry : localResources.entrySet()) {
+ log.info("Local resource: " + entry.getKey());
+ }
+ DAG tezDag = buildDAG(tezPlanNode, localResources);
+ return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism());
} catch (Exception e) {
int errCode = 2017;
String msg = "Internal error creating job configuration.";