You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 03:34:40 UTC
svn commit: r1784224 [5/17] - in /pig/branches/spark: ./ bin/ conf/
contrib/piggybank/java/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Fri Feb 24 03:34:37 2017
@@ -97,7 +97,7 @@ public class POFRJoin extends PhysicalOp
// The array of Hashtables one per replicated input. replicates[fragment] =
// null fragment is the input which is fragmented and not replicated.
- protected transient List<Map<? extends Object, ? extends List<Tuple>>> replicates;
+ protected transient TupleToMapKey replicates[];
// varaible which denotes whether we are returning tuples from the foreach
// operator
protected transient boolean processingPlan;
@@ -234,10 +234,7 @@ public class POFRJoin extends PhysicalOp
Result res = null;
Result inp = null;
if (!setUp) {
- replicates = new ArrayList<Map<? extends Object, ? extends List<Tuple>>>(phyPlanLists.size());
- for (int i = 0 ; i < phyPlanLists.size(); i++) {
- replicates.add(null);
- }
+ replicates = new TupleToMapKey[phyPlanLists.size()];
dumTup = mTupleFactory.newTuple(1);
setUpHashMap();
setUp = true;
@@ -285,7 +282,8 @@ public class POFRJoin extends PhysicalOp
return new Result();
}
Tuple lrOutTuple = (Tuple) lrOut.result;
- Object key = lrOutTuple.get(1);
+ Tuple key = mTupleFactory.newTuple(1);
+ key.set(0, lrOutTuple.get(1));
Tuple value = getValueTuple(lr, lrOutTuple);
lr.detachInput();
// Configure the for each operator with the relevant bags
@@ -298,7 +296,7 @@ public class POFRJoin extends PhysicalOp
ce.setValue(value);
continue;
}
- Map<? extends Object, ? extends List<Tuple>> replicate = replicates.get(i);
+ TupleToMapKey replicate = replicates[i];
if (replicate.get(key) == null) {
if (isLeftOuterJoin) {
ce.setValue(nullBag);
@@ -306,7 +304,7 @@ public class POFRJoin extends PhysicalOp
noMatch = true;
break;
}
- ce.setValue(new NonSpillableDataBag(replicate.get(key)));
+ ce.setValue(new NonSpillableDataBag(replicate.get(key).getList()));
}
// If this is not LeftOuter Join and there was no match we
@@ -329,28 +327,27 @@ public class POFRJoin extends PhysicalOp
}
}
- protected static class TupleToMapKey extends HashMap<Object, ArrayList<Tuple>> {
+ protected static class TupleToMapKey {
+ private HashMap<Tuple, TuplesToSchemaTupleList> tuples;
private SchemaTupleFactory tf;
public TupleToMapKey(int ct, SchemaTupleFactory tf) {
- super(ct);
+ tuples = new HashMap<Tuple, TuplesToSchemaTupleList>(ct);
this.tf = tf;
}
- @Override
- public TuplesToSchemaTupleList put(Object key, ArrayList<Tuple> val) {
- if (tf != null && key instanceof Tuple) {
- key = TuplesToSchemaTupleList.convert((Tuple)key, tf);
+ public TuplesToSchemaTupleList put(Tuple key, TuplesToSchemaTupleList val) {
+ if (tf != null) {
+ key = TuplesToSchemaTupleList.convert(key, tf);
}
- return (TuplesToSchemaTupleList) super.put(key, val);
+ return tuples.put(key, val);
}
- @Override
- public TuplesToSchemaTupleList get(Object key) {
- if (tf != null && key instanceof Tuple) {
- key = TuplesToSchemaTupleList.convert((Tuple)key, tf);
+ public TuplesToSchemaTupleList get(Tuple key) {
+ if (tf != null) {
+ key = TuplesToSchemaTupleList.convert(key, tf);
}
- return (TuplesToSchemaTupleList) super.get(key);
+ return tuples.get(key);
}
}
@@ -385,7 +382,7 @@ public class POFRJoin extends PhysicalOp
SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i];
if (i == fragment) {
- replicates.set(i, null);
+ replicates[i] = null;
continue;
}
@@ -404,34 +401,25 @@ public class POFRJoin extends PhysicalOp
POLocalRearrange lr = LRs[i];
lr.setInputs(Arrays.asList((PhysicalOperator) ld));
- Map<Object, ArrayList<Tuple>> replicate;
- if (keySchemaTupleFactory == null) {
- replicate = new HashMap<Object, ArrayList<Tuple>>(1000);
- } else {
- replicate = new TupleToMapKey(1000, keySchemaTupleFactory);
- }
+ TupleToMapKey replicate = new TupleToMapKey(1000, keySchemaTupleFactory);
log.debug("Completed setup. Trying to build replication hash table");
for (Result res = lr.getNextTuple(); res.returnStatus != POStatus.STATUS_EOP; res = lr.getNextTuple()) {
if (getReporter() != null)
getReporter().progress();
Tuple tuple = (Tuple) res.result;
- Object key = tuple.get(1);
- if (isKeyNull(key)) continue;
+ if (isKeyNull(tuple.get(1))) continue;
+ Tuple key = mTupleFactory.newTuple(1);
+ key.set(0, tuple.get(1));
Tuple value = getValueTuple(lr, tuple);
- ArrayList<Tuple> values = replicate.get(key);
- if (values == null) {
- if (inputSchemaTupleFactory == null) {
- values = new ArrayList<Tuple>(1);
- } else {
- values = new TuplesToSchemaTupleList(1, inputSchemaTupleFactory);
- }
- replicate.put(key, values);
+ if (replicate.get(key) == null) {
+ replicate.put(key, new TuplesToSchemaTupleList(1, inputSchemaTupleFactory));
}
- values.add(value);
+
+ replicate.get(key).add(value);
}
- replicates.set(i, replicate);
+ replicates[i] = replicate;
}
long time2 = System.currentTimeMillis();
log.debug("Hash Table built. Time taken: " + (time2 - time1));
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java Fri Feb 24 03:34:37 2017
@@ -51,7 +51,7 @@ public class POFRJoinSpark extends POFRJ
addSchemaToFactories(keySchemas[i], keySchemaTupleFactories, i);
}
- replicates.set(fragment, null);
+ replicates[fragment] = null;
int i = -1;
long start = System.currentTimeMillis();
for (int k = 0; k < inputSchemas.length; ++k) {
@@ -61,7 +61,7 @@ public class POFRJoinSpark extends POFRJ
SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i];
if (i == fragment) {
- replicates.set(fragment, null);
+ replicates[i] = null;
continue;
}
@@ -91,7 +91,7 @@ public class POFRJoinSpark extends POFRJ
replicate.get(key).add(value);
}
- replicates.set(i, replicate);
+ replicates[i] = replicate;
}
long end = System.currentTimeMillis();
log.debug("Hash Table built. Time taken: " + (end - start));
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Fri Feb 24 03:34:37 2017
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -56,7 +55,6 @@ import org.apache.pig.pen.util.LineageTr
@SuppressWarnings("unchecked")
public class POForEach extends PhysicalOperator {
private static final long serialVersionUID = 1L;
- private static final Result UNLIMITED_NULL_RESULT = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple());
protected List<PhysicalPlan> inputPlans;
@@ -266,7 +264,7 @@ public class POForEach extends PhysicalO
if (inp.returnStatus == POStatus.STATUS_EOP) {
if (parentPlan!=null && parentPlan.endOfAllInput && !endOfAllInputProcessed && endOfAllInputProcessing) {
// continue pull one more output
- inp = UNLIMITED_NULL_RESULT;
+ inp = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple());
} else {
return inp;
}
@@ -443,8 +441,6 @@ public class POForEach extends PhysicalO
if(inputData.result instanceof DataBag && isToBeFlattenedArray[i]) {
its[i] = ((DataBag)bags[i]).iterator();
- } else if (inputData.result instanceof Map && isToBeFlattenedArray[i]) {
- its[i] = ((Map)bags[i]).entrySet().iterator();
} else {
its[i] = null;
}
@@ -470,7 +466,7 @@ public class POForEach extends PhysicalO
//we instantiate the template array and start populating it with data
data = new Object[noItems];
for(int i = 0; i < noItems; ++i) {
- if(isToBeFlattenedArray[i] && (bags[i] instanceof DataBag || bags[i] instanceof Map)) {
+ if(isToBeFlattenedArray[i] && bags[i] instanceof DataBag) {
if(its[i].hasNext()) {
data[i] = its[i].next();
} else {
@@ -544,15 +540,6 @@ public class POForEach extends PhysicalO
out.append(t.get(j));
}
}
- } else if (isToBeFlattenedArray[i] && in instanceof Map.Entry) {
- Map.Entry entry = (Map.Entry)in;
- if (knownSize) {
- out.set(idx++, entry.getKey());
- out.set(idx++, entry.getValue());
- } else {
- out.append(entry.getKey());
- out.append(entry.getValue());
- }
} else {
if (knownSize) {
out.set(idx++, in);
@@ -751,12 +738,9 @@ public class POForEach extends PhysicalO
opsToBeReset.add(sort);
}
- @Override
- public void visitCross(POCross c) throws VisitorException {
- // FIXME: add only if limit is present
- opsToBeReset.add(c);
- }
-
+ /* (non-Javadoc)
+ * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject)
+ */
@Override
public void visitProject(POProject proj) throws VisitorException {
if(proj instanceof PORelationToExprProject) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Fri Feb 24 03:34:37 2017
@@ -56,11 +56,11 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.newplan.logical.relational.LOJoin;
-/** This operator implements merge join algorithm to do map side joins.
+/** This operator implements merge join algorithm to do map side joins.
* Currently, only two-way joins are supported. One input of join is identified as left
* and other is identified as right. Left input tuples are the input records in map.
* Right tuples are read from HDFS by opening right stream.
- *
+ *
* This join doesn't support outer join.
* Data is assumed to be sorted in ascending order. It will fail if data is sorted in descending order.
*/
@@ -99,7 +99,7 @@ public class POMergeJoin extends Physica
private FuncSpec rightLoaderFuncSpec;
private String rightInputFileName;
-
+
private String indexFile;
// Buffer to hold accumulated left tuples.
@@ -249,11 +249,12 @@ public class POMergeJoin extends Physica
* from Tuple to SchemaTuple. This is necessary because we are not getting SchemaTuples
* from the source, though in the future that is what we would like to do.
*/
- public static class TuplesToSchemaTupleList extends ArrayList<Tuple> {
+ public static class TuplesToSchemaTupleList {
+ private List<Tuple> tuples;
private SchemaTupleFactory tf;
public TuplesToSchemaTupleList(int ct, TupleMaker<?> tf) {
- super(ct);
+ tuples = new ArrayList<Tuple>(ct);
if (tf instanceof SchemaTupleFactory) {
this.tf = (SchemaTupleFactory)tf;
}
@@ -272,24 +273,24 @@ public class POMergeJoin extends Physica
}
}
- @Override
public boolean add(Tuple t) {
if (tf != null) {
t = convert(t, tf);
}
- return super.add(t);
+ return tuples.add(t);
}
- @Override
public Tuple get(int i) {
- return super.get(i);
+ return tuples.get(i);
}
- @Override
public int size() {
- return super.size();
+ return tuples.size();
}
+ public List<Tuple> getList() {
+ return tuples;
+ }
}
@SuppressWarnings("unchecked")
@@ -356,7 +357,7 @@ public class POMergeJoin extends Physica
}
else{
Object rightKey = extractKeysFromTuple(rightInp, 1);
- if(null == rightKey) // If we see tuple having null keys in stream, we drop them
+ if(null == rightKey) // If we see tuple having null keys in stream, we drop them
continue; // and fetch next tuple.
int cmpval = ((Comparable)rightKey).compareTo(curJoinKey);
@@ -398,7 +399,7 @@ public class POMergeJoin extends Physica
"Last two tuples encountered were: \n"+
curJoiningRightTup+ "\n" + (Tuple)rightInp.result ;
throw new ExecException(errMsg,errCode);
- }
+ }
}
}
}
@@ -429,17 +430,17 @@ public class POMergeJoin extends Physica
prevLeftKey+ "\n" + curLeftKey ;
throw new ExecException(errMsg,errCode);
}
-
+
case POStatus.STATUS_EOP:
if(this.parentPlan.endOfAllInput || isEndOfInput()){
- // We hit the end on left input.
+ // We hit the end on left input.
// Tuples in bag may still possibly join with right side.
curJoinKey = prevLeftKey;
curLeftKey = null;
if (isEndOfInput()) {
leftInputConsumedInSpark = true;
}
- break;
+ break;
}
else // Fetch next left input.
return curLeftInp;
@@ -464,7 +465,7 @@ public class POMergeJoin extends Physica
// Accumulated tuples with same key on left side.
// But since we are reading ahead we still haven't checked the read ahead right tuple.
// Accumulated left tuples may potentially join with that. So, lets check that first.
-
+
if((null != prevRightKey) && prevRightKey.equals(prevLeftKey)){
curJoiningRightTup = (Tuple)prevRightInp.result;
@@ -486,17 +487,17 @@ public class POMergeJoin extends Physica
slidingToNextRecord = false;
} else
rightInp = getNextRightInp(prevLeftKey);
-
+
if(rightInp.returnStatus != POStatus.STATUS_OK)
return rightInp;
Object extractedRightKey = extractKeysFromTuple(rightInp, 1);
-
- if(null == extractedRightKey) // If we see tuple having null keys in stream, we drop them
+
+ if(null == extractedRightKey) // If we see tuple having null keys in stream, we drop them
continue; // and fetch next tuple.
-
+
Comparable rightKey = (Comparable)extractedRightKey;
-
+
if( prevRightKey != null && rightKey.compareTo(prevRightKey) < 0){
// Sanity check.
int errCode = 1102;
@@ -527,7 +528,7 @@ public class POMergeJoin extends Physica
else{ // We got ahead on right side. Store currently read right tuple.
prevRightKey = rightKey;
prevRightInp = rightInp;
- // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call.
+ // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call.
leftTuples = newLeftTupleArray();
leftTuples.add((Tuple)curLeftInp.result);
prevLeftInp = curLeftInp;
@@ -554,7 +555,7 @@ public class POMergeJoin extends Physica
DefaultIndexableLoader loader = (DefaultIndexableLoader)rightLoader;
loader.setIndexFile(indexFile);
}
-
+
// Pass signature of the loader to rightLoader
// make a copy of the conf to use in calls to rightLoader.
rightLoader.setUDFContextSignature(signature);
@@ -607,11 +608,11 @@ public class POMergeJoin extends Physica
// run the tuple through the pipeline
rightPipelineRoot.attachInput(t);
return this.getNextRightInp();
-
+
}
default: // We don't deal with ERR/NULL. just pass them down
throwProcessingException(false, null);
-
+
}
}
} catch (IOException e) {
@@ -642,8 +643,8 @@ public class POMergeJoin extends Physica
int errCode = 2167;
String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly";
throw new ExecException(errMsg,errCode,PigException.BUG);
- }
-
+ }
+
return ((Tuple) lrOut.result).get(1);
}
@@ -659,7 +660,7 @@ public class POMergeJoin extends Physica
noInnerPlanOnRightSide = false;
this.rightPipelineLeaf = rightPipeline.getLeaves().get(0);
this.rightPipelineRoot = rightPipeline.getRoots().get(0);
- this.rightPipelineRoot.setInputs(null);
+ this.rightPipelineRoot.setInputs(null);
}
else
noInnerPlanOnRightSide = true;
@@ -710,18 +711,18 @@ public class POMergeJoin extends Physica
public boolean supportsMultipleOutputs() {
return false;
}
-
+
/**
* @param rightInputFileName the rightInputFileName to set
*/
public void setRightInputFileName(String rightInputFileName) {
this.rightInputFileName = rightInputFileName;
}
-
+
public String getSignature() {
return signature;
}
-
+
public void setSignature(String signature) {
this.signature = signature;
}
@@ -733,12 +734,12 @@ public class POMergeJoin extends Physica
public String getIndexFile() {
return indexFile;
}
-
+
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
return null;
}
-
+
public LOJoin.JOINTYPE getJoinType() {
return joinType;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java Fri Feb 24 03:34:37 2017
@@ -44,9 +44,6 @@ public class POPoissonSample extends Phy
private transient boolean initialized;
- // num of rows skipped so far
- private transient int numSkipped;
-
// num of rows sampled so far
private transient int numRowsSampled;
@@ -92,7 +89,6 @@ public class POPoissonSample extends Phy
@Override
public Result getNextTuple() throws ExecException {
if (!initialized) {
- numSkipped = 0;
numRowsSampled = 0;
avgTupleMemSz = 0;
rowNum = 0;
@@ -138,7 +134,7 @@ public class POPoissonSample extends Phy
}
// skip tuples
- while (numSkipped < skipInterval) {
+ for (long numSkipped = 0; numSkipped < skipInterval; numSkipped++) {
res = processInput();
if (res.returnStatus == POStatus.STATUS_NULL) {
continue;
@@ -152,7 +148,6 @@ public class POPoissonSample extends Phy
return res;
}
rowNum++;
- numSkipped++;
}
// skipped enough, get new sample
@@ -178,8 +173,6 @@ public class POPoissonSample extends Phy
rowNum++;
newSample = res;
- // reset skipped
- numSkipped = 0;
return currentSample;
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Fri Feb 24 03:34:37 2017
@@ -125,7 +125,7 @@ public class POReservoirSample extends P
}
// collect samples until input is exhausted
- int rand = randGen.nextInt(rowProcessed + 1);
+ int rand = randGen.nextInt(rowProcessed);
if (rand < numSamples) {
samples[rand] = res;
}
@@ -133,13 +133,8 @@ public class POReservoirSample extends P
}
}
- if (res.returnStatus == POStatus.STATUS_EOP) {
- if (this.parentPlan.endOfAllInput) {
- sampleCollectionDone = true;
- } else {
- // In case of Split can get EOP in between.
- return res;
- }
+ if (this.parentPlan.endOfAllInput && res.returnStatus == POStatus.STATUS_EOP) {
+ sampleCollectionDone = true;
}
return getSample();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Fri Feb 24 03:34:37 2017
@@ -51,13 +51,13 @@ public class Packager implements Illustr
protected DataBag[] bags;
public static enum PackageType {
- GROUP, JOIN, BLOOMJOIN
+ GROUP, JOIN
};
protected transient Illustrator illustrator = null;
// The key being worked on
- protected Object key;
+ Object key;
// marker to indicate if key is a tuple
protected boolean isKeyTuple = false;
@@ -65,7 +65,7 @@ public class Packager implements Illustr
protected boolean isKeyCompound = false;
// key's type
- protected byte keyType;
+ byte keyType;
// The number of inputs to this
// co-group. 0 indicates a distinct, which means there will only be a
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java Fri Feb 24 03:34:37 2017
@@ -60,7 +60,7 @@ public class StoreFuncDecorator {
private boolean allowErrors() {
return UDFContext.getUDFContext().getJobConf()
- .getBoolean(PigConfiguration.PIG_ERROR_HANDLING_ENABLED, false);
+ .getBoolean(PigConfiguration.PIG_ALLOW_STORE_ERRORS, false);
}
/**
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Fri Feb 24 03:34:37 2017
@@ -162,13 +162,8 @@ public class LoadConverter implements RD
private SparkEngineConf sparkEngineConf;
private boolean initialized;
- //LoadConverter#ToTupleFunction is executed more than once in multiquery case causing
- //invalid number of input records, 'skip' flag below indicates first load is finished.
- private boolean skip;
-
public ToTupleFunction(SparkEngineConf sparkEngineConf){
this.sparkEngineConf = sparkEngineConf;
-
}
@Override
@@ -177,14 +172,9 @@ public class LoadConverter implements RD
long partitionId = TaskContext.get().partitionId();
PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, Long.toString(partitionId));
- //We're in POSplit and already counted all input records,
- //in a multiquery case skip will be set to true after the first load is finished:
- if (sparkCounters != null && SparkPigStatusReporter.getInstance().getCounters().getCounter(counterGroupName, counterName).getValue() > 0) {
- skip=true;
- }
initialized = true;
}
- if (sparkCounters != null && disableCounter == false && skip == false) {
+ if (sparkCounters != null && disableCounter == false) {
sparkCounters.increment(counterGroupName, counterName, 1L);
}
return v1._2();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Fri Feb 24 03:34:37 2017
@@ -19,14 +19,13 @@
package org.apache.pig.backend.hadoop.executionengine.tez;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
@@ -44,7 +43,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -58,7 +56,6 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator;
@@ -90,6 +87,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
@@ -110,6 +108,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.builtin.DefaultIndexableLoader;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
@@ -175,7 +174,6 @@ public class TezDagBuilder extends TezOp
private PigContext pc;
private Configuration globalConf;
private Configuration pigContextConf;
- private Configuration shuffleVertexManagerBaseConf;
private FileSystem fs;
private long intermediateTaskInputSize;
private Set<String> inputSplitInDiskVertices;
@@ -193,8 +191,6 @@ public class TezDagBuilder extends TezOp
private String mapTaskLaunchCmdOpts;
private String reduceTaskLaunchCmdOpts;
- private boolean disableDAGRecovery = false;
-
public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
Map<String, LocalResource> localResources) {
super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
@@ -214,10 +210,6 @@ public class TezDagBuilder extends TezOp
}
}
- public boolean shouldDisableDAGRecovery() {
- return disableDAGRecovery;
- }
-
private void initialize(PigContext pc) throws IOException {
this.globalConf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
@@ -225,16 +217,6 @@ public class TezDagBuilder extends TezOp
this.pigContextConf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
MRToTezHelper.processMRSettings(pigContextConf, globalConf);
- shuffleVertexManagerBaseConf = new Configuration(false);
- // Only copy tez.shuffle-vertex-manager config to keep payload size small
- Iterator<Entry<String, String>> iter = pigContextConf.iterator();
- while (iter.hasNext()) {
- Entry<String, String> entry = iter.next();
- if (entry.getKey().startsWith("tez.shuffle-vertex-manager")) {
- shuffleVertexManagerBaseConf.set(entry.getKey(), entry.getValue());
- }
- }
-
// Add credentials from binary token file and get tokens for namenodes
// specified in mapreduce.job.hdfs-servers
SecurityHelper.populateTokenCache(globalConf, dag.getCredentials());
@@ -283,7 +265,7 @@ public class TezDagBuilder extends TezOp
if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) == null) {
// If tez setting is not defined
MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, mapTaskEnv, true);
- MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, false);
+ MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, true);
}
if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) != null) {
@@ -297,7 +279,7 @@ public class TezDagBuilder extends TezOp
try {
fs = FileSystem.get(globalConf);
- intermediateTaskInputSize = fs.getDefaultBlockSize(FileLocalizer.getTemporaryResourcePath(pc));
+ intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(fs, FileLocalizer.getTemporaryResourcePath(pc));
} catch (Exception e) {
log.warn("Unable to get the block size for temporary directory, defaulting to 128MB", e);
intermediateTaskInputSize = 134217728L;
@@ -415,11 +397,7 @@ public class TezDagBuilder extends TezOp
tezOp.getVertexGroupInfo().setVertexGroup(vertexGroup);
POStore store = tezOp.getVertexGroupInfo().getStore();
if (store != null) {
- String outputKey = store.getOperatorKey().toString();
- if (store instanceof POStoreTez) {
- outputKey = ((POStoreTez) store).getOutputKey();
- }
- vertexGroup.addDataSink(outputKey,
+ vertexGroup.addDataSink(store.getOperatorKey().toString(),
DataSinkDescriptor.create(tezOp.getVertexGroupInfo().getStoreOutputDescriptor(),
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), dag.getCredentials()));
}
@@ -463,14 +441,7 @@ public class TezDagBuilder extends TezOp
Configuration conf = new Configuration(pigContextConf);
- if (edge.needsDistinctCombiner()) {
- conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS,
- MRCombiner.class.getName());
- conf.set(MRJobConfig.COMBINE_CLASS_ATTR,
- DistinctCombiner.Combine.class.getName());
- log.info("Setting distinct combiner class between "
- + from.getOperatorKey() + " and " + to.getOperatorKey());
- } else if (!combinePlan.isEmpty()) {
+ if (!combinePlan.isEmpty()) {
udfContextSeparator.serializeUDFContextForEdge(conf, from, to, UDFType.USERFUNC);
addCombiner(combinePlan, to, conf, isMergedInput);
}
@@ -479,7 +450,7 @@ public class TezDagBuilder extends TezOp
POLocalRearrangeTez.class);
for (POLocalRearrangeTez lr : lrs) {
- if (lr.containsOutputKey(to.getOperatorKey().toString())) {
+ if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
byte keyType = lr.getKeyType();
setIntermediateOutputKeyValue(keyType, conf, to, lr.isConnectedToPackage(), isMergedInput);
// In case of secondary key sort, main key type is the actual key type
@@ -508,8 +479,7 @@ public class TezDagBuilder extends TezOp
conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
- conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pc.getExecType().isLocal());
- conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pc.getLog4jProperties()));
+ conf.set("pig.pigContext", serializedPigContext);
conf.set("udf.import.list", serializedUDFImportList);
if(to.isGlobalSort() || to.isLimitAfterSort()){
@@ -540,36 +510,26 @@ public class TezDagBuilder extends TezOp
UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
out.setUserPayload(payLoad);
- in.setUserPayload(payLoad);
- // Remove combiner and reset payload
if (!combinePlan.isEmpty()) {
boolean noCombineInReducer = false;
- boolean noCombineInMapper = edge.getCombinerInMap() == null ? false : !edge.getCombinerInMap();
String reducerNoCombiner = globalConf.get(PigConfiguration.PIG_EXEC_NO_COMBINER_REDUCER);
- if (edge.getCombinerInReducer() != null) {
- noCombineInReducer = !edge.getCombinerInReducer();
- } else if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) {
+ if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) {
noCombineInReducer = TezCompilerUtil.bagDataTypeInCombinePlan(combinePlan);
} else {
noCombineInReducer = Boolean.parseBoolean(reducerNoCombiner);
}
- if (noCombineInReducer || noCombineInMapper) {
+ if (noCombineInReducer) {
log.info("Turning off combiner in reducer vertex " + to.getOperatorKey() + " for edge from " + from.getOperatorKey());
conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
conf.unset(MRJobConfig.COMBINE_CLASS_ATTR);
conf.unset("pig.combinePlan");
conf.unset("pig.combine.package");
conf.unset("pig.map.keytype");
- UserPayload payLoadWithoutCombiner = TezUtils.createUserPayloadFromConf(conf);
- if (noCombineInMapper) {
- out.setUserPayload(payLoadWithoutCombiner);
- }
- if (noCombineInReducer) {
- in.setUserPayload(payLoadWithoutCombiner);
- }
+ payLoad = TezUtils.createUserPayloadFromConf(conf);
}
}
+ in.setUserPayload(payLoad);
if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
// Use custom edge
@@ -633,8 +593,6 @@ public class TezDagBuilder extends TezOp
setOutputFormat(job);
payloadConf.set("udf.import.list", serializedUDFImportList);
payloadConf.set("exectype", "TEZ");
- payloadConf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pc.getExecType().isLocal());
- payloadConf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pc.getLog4jProperties()));
// Process stores
LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
@@ -653,7 +611,11 @@ public class TezDagBuilder extends TezOp
payloadConf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
payloadConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
inputPayLoad = new Configuration(payloadConf);
+ if (tezOp.getLoaderInfo().getLoads().get(0).getLoadFunc() instanceof DefaultIndexableLoader) {
+ inputPayLoad.set("pig.pigContext", serializedPigContext);
+ }
}
+ payloadConf.set("pig.pigContext", serializedPigContext);
if (tezOp.getSampleOperator() != null) {
payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.getSampleOperator().getOperatorKey().toString());
@@ -727,7 +689,7 @@ public class TezDagBuilder extends TezOp
PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
for (POLocalRearrangeTez lr : lrs) {
if (lr.isConnectedToPackage()
- && lr.containsOutputKey(tezOp.getOperatorKey().toString())) {
+ && lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
localRearrangeMap.put((int) lr.getIndex(), inputKey);
if (isVertexGroup) {
isMergedInput = true;
@@ -810,25 +772,9 @@ public class TezDagBuilder extends TezOp
String vmPluginName = null;
Configuration vmPluginConf = null;
- boolean containScatterGather = false;
- boolean containCustomPartitioner = false;
- for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
- if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) {
- containScatterGather = true;
- }
- if (edge.partitionerClass != null) {
- containCustomPartitioner = true;
- }
- }
-
- if(containScatterGather) {
- vmPluginName = ShuffleVertexManager.class.getName();
- vmPluginConf = new Configuration(shuffleVertexManagerBaseConf);
- }
// Set the right VertexManagerPlugin
if (tezOp.getEstimatedParallelism() != -1) {
- boolean autoParallelism = false;
if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
if (tezOp.getVertexParallelism()==-1 && (
tezOp.isGlobalSort() &&getPlan().getPredecessors(tezOp).size()==1||
@@ -837,12 +783,33 @@ public class TezDagBuilder extends TezOp
// to decrease/increase parallelism of sorting vertex dynamically
// based on the numQuantiles calculated by sample aggregation vertex
vmPluginName = PartitionerDefinedVertexManager.class.getName();
- autoParallelism = true;
log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
}
} else {
+ boolean containScatterGather = false;
+ boolean containCustomPartitioner = false;
+ for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
+ if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) {
+ containScatterGather = true;
+ }
+ if (edge.partitionerClass!=null) {
+ containCustomPartitioner = true;
+ }
+ }
if (containScatterGather && !containCustomPartitioner) {
-
+ vmPluginConf = (vmPluginConf == null) ? new Configuration(pigContextConf) : vmPluginConf;
+ // Use auto-parallelism feature of ShuffleVertexManager to dynamically
+ // reduce the parallelism of the vertex
+ if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
+ && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()) {
+ vmPluginName = PigGraceShuffleVertexManager.class.getName();
+ tezOp.setUseGraceParallelism(true);
+ vmPluginConf.set("pig.tez.plan", getSerializedTezPlan());
+ vmPluginConf.set("pig.pigContext", serializedPigContext);
+ } else {
+ vmPluginName = ShuffleVertexManager.class.getName();
+ }
+ vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
// For Intermediate reduce, set the bytes per reducer to be block size.
long bytesPerReducer = intermediateTaskInputSize;
// If there are store statements, use BYTES_PER_REDUCER_PARAM configured by user.
@@ -851,8 +818,8 @@ public class TezDagBuilder extends TezOp
// In Tez, numReducers=(map output size/bytesPerReducer) we need lower values to avoid skews in reduce
// as map input sizes are mostly always high compared to map output.
if (stores.size() > 0) {
- if (pigContextConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
- bytesPerReducer = pigContextConf.getLong(
+ if (vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
+ bytesPerReducer = vmPluginConf.getLong(
InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
} else if (tezOp.isGroupBy()) {
@@ -861,28 +828,10 @@ public class TezDagBuilder extends TezOp
bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_DEFAULT;
}
}
-
- // Use auto-parallelism feature of ShuffleVertexManager to dynamically
- // reduce the parallelism of the vertex. Use PigGraceShuffleVertexManager
- // instead of ShuffleVertexManager if pig.tez.grace.parallelism is turned on
- if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
- && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()
- && tezOp.getCrossKeys() == null) {
- vmPluginName = PigGraceShuffleVertexManager.class.getName();
- tezOp.setUseGraceParallelism(true);
- vmPluginConf.set("pig.tez.plan", getSerializedTezPlan());
- vmPluginConf.set(PigImplConstants.PIG_CONTEXT, serializedPigContext);
- vmPluginConf.setLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, bytesPerReducer);
- }
- vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, bytesPerReducer);
- autoParallelism = true;
log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString());
}
}
- if (globalConf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY, false) && autoParallelism) {
- disableDAGRecovery = true;
- }
}
if (tezOp.isLimit() && (vmPluginName == null || vmPluginName.equals(PigGraceShuffleVertexManager.class.getName())||
vmPluginName.equals(ShuffleVertexManager.class.getName()))) {
@@ -1460,12 +1409,22 @@ public class TezDagBuilder extends TezOp
private void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
// the OutputFormat we report to Hadoop is always PigOutputFormat which
- // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set
+ // can be wrapped with LazyOutputFormat provided if it is supported by
+ // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
- LazyOutputFormat.setOutputFormatClass(job,PigOutputFormatTez.class);
+ try {
+ Class<?> clazz = PigContext
+ .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
+ Method method = clazz.getMethod("setOutputFormatClass",
+ org.apache.hadoop.mapreduce.Job.class, Class.class);
+ method.invoke(null, job, PigOutputFormatTez.class);
+ } catch (Exception e) {
+ job.setOutputFormatClass(PigOutputFormatTez.class);
+ log.warn(PigConfiguration.PIG_OUTPUT_LAZY
+ + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
+ }
} else {
job.setOutputFormatClass(PigOutputFormatTez.class);
}
}
-
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Fri Feb 24 03:34:37 2017
@@ -30,11 +30,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.pig.PigConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
-import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
import org.apache.tez.client.TezClient;
@@ -56,7 +51,7 @@ import com.google.common.collect.Maps;
*/
public class TezJob implements Runnable {
private static final Log log = LogFactory.getLog(TezJob.class);
- private TezConfiguration conf;
+ private Configuration conf;
private EnumSet<StatusGetOpts> statusGetOpts;
private Map<String, LocalResource> requestAMResources;
private ApplicationId appId;
@@ -74,71 +69,31 @@ public class TezJob implements Runnable
public TezJob(TezConfiguration conf, DAG dag,
Map<String, LocalResource> requestAMResources,
- TezOperPlan tezPlan) throws IOException {
+ int estimatedTotalParallelism) throws IOException {
this.conf = conf;
this.dag = dag;
this.requestAMResources = requestAMResources;
this.reuseSession = conf.getBoolean(PigConfiguration.PIG_TEZ_SESSION_REUSE, true);
this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
- tezJobConf = new TezJobConfig(tezPlan);
+ tezJobConf = new TezJobConfig(estimatedTotalParallelism);
}
static class TezJobConfig {
private int estimatedTotalParallelism = -1;
- private int maxOutputsinSingleVertex;
- private int totalVertices = 0;
- public TezJobConfig(TezOperPlan tezPlan) throws VisitorException {
- this.estimatedTotalParallelism = tezPlan.getEstimatedTotalParallelism();
- MaxOutputsFinder finder = new MaxOutputsFinder(tezPlan);
- finder.visit();
- this.maxOutputsinSingleVertex = finder.getMaxOutputsinSingleVertex();
- this.totalVertices = finder.getTotalVertices();
+ public TezJobConfig(int estimatedTotalParallelism) {
+ this.estimatedTotalParallelism = estimatedTotalParallelism;
}
public int getEstimatedTotalParallelism() {
return estimatedTotalParallelism;
}
- public int getMaxOutputsinSingleVertex() {
- return maxOutputsinSingleVertex;
+ public void setEstimatedTotalParallelism(int estimatedTotalParallelism) {
+ this.estimatedTotalParallelism = estimatedTotalParallelism;
}
- public int getTotalVertices() {
- return totalVertices;
- }
-
- }
-
- private static class MaxOutputsFinder extends TezOpPlanVisitor {
-
- private int maxOutputsinSingleVertex = 1;
- private int totalVertices = 0;
-
- public MaxOutputsFinder(TezOperPlan plan) {
- super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
- }
-
- public int getMaxOutputsinSingleVertex() {
- return maxOutputsinSingleVertex;
- }
-
- public int getTotalVertices() {
- return totalVertices;
- }
-
- @Override
- public void visitTezOp(TezOperator tezOperator) throws VisitorException {
- if (!tezOperator.isVertexGroup()) {
- totalVertices++;
- int outputs = tezOperator.outEdges.keySet().size();
- maxOutputsinSingleVertex = maxOutputsinSingleVertex > outputs ? maxOutputsinSingleVertex : outputs;
- }
- }
-
-
-
}
public DAG getDAG() {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Fri Feb 24 03:34:37 2017
@@ -19,7 +19,6 @@ package org.apache.pig.backend.hadoop.ex
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.Method;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@@ -31,7 +30,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.pig.PigException;
-import org.apache.pig.backend.hadoop.PigATSClient;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
@@ -52,12 +50,11 @@ public class TezJobCompiler {
private static final Log log = LogFactory.getLog(TezJobCompiler.class);
private PigContext pigContext;
- private Configuration conf;
- private boolean disableDAGRecovery;
+ private TezConfiguration tezConf;
public TezJobCompiler(PigContext pigContext, Configuration conf) throws IOException {
this.pigContext = pigContext;
- this.conf = conf;
+ this.tezConf = new TezConfiguration(conf);
}
public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String, LocalResource> localResources)
@@ -67,7 +64,6 @@ public class TezJobCompiler {
TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlanNode.getTezOperPlan(), tezDag, localResources);
dagBuilder.visit();
dagBuilder.avoidContainerReuseIfInputSplitInDisk();
- disableDAGRecovery = dagBuilder.shouldDisableDAGRecovery();
return tezDag;
}
@@ -89,7 +85,6 @@ public class TezJobCompiler {
return job;
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
private TezJob getJob(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer)
throws JobCreationException {
try {
@@ -112,34 +107,8 @@ public class TezJobCompiler {
}
DAG tezDag = buildDAG(tezPlanNode, localResources);
tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript()));
- // set Tez caller context
- // Reflection for the following code since it is only available since tez 0.8.1:
- // CallerContext context = CallerContext.create(ATSService.CallerContext, ATSService.getPigAuditId(pigContext),
- // ATSService.EntityType, "");
- // tezDag.setCallerContext(context);
- Class callerContextClass = null;
- try {
- callerContextClass = Class.forName("org.apache.tez.client.CallerContext");
- } catch (ClassNotFoundException e) {
- // If pre-Tez 0.8.1, skip setting CallerContext
- }
- if (callerContextClass != null) {
- Method builderBuildMethod = callerContextClass.getMethod("create", String.class,
- String.class, String.class, String.class);
- Object context = builderBuildMethod.invoke(null, PigATSClient.CALLER_CONTEXT,
- PigATSClient.getPigAuditId(pigContext), PigATSClient.ENTITY_TYPE, "");
- Method dagSetCallerContext = tezDag.getClass().getMethod("setCallerContext",
- context.getClass());
- dagSetCallerContext.invoke(tezDag, context);
- }
log.info("Total estimated parallelism is " + tezPlan.getEstimatedTotalParallelism());
- TezConfiguration tezConf = new TezConfiguration(conf);
- if (disableDAGRecovery
- && tezConf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
- TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
- tezConf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
- }
- return new TezJob(tezConf, tezDag, localResources, tezPlan);
+ return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism());
} catch (Exception e) {
int errCode = 2017;
String msg = "Internal error creating job configuration.";
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Fri Feb 24 03:34:37 2017
@@ -22,7 +22,6 @@ import java.io.PrintStream;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -167,7 +166,7 @@ public class TezLauncher extends Launche
tezStats = new TezPigScriptStats(pc);
PigStats.start(tezStats);
- conf.setIfUnset(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true");
+ conf.set(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true");
TezJobCompiler jc = new TezJobCompiler(pc, conf);
TezPlanContainer tezPlanContainer = compile(php, pc);
@@ -175,10 +174,6 @@ public class TezLauncher extends Launche
tezScriptState.emitInitialPlanNotification(tezPlanContainer);
tezScriptState.emitLaunchStartedNotification(tezPlanContainer.size()); //number of DAGs to Launch
- boolean stop_on_failure =
- Boolean.valueOf(pc.getProperties().getProperty("stop.on.failure", "false"));
- boolean stoppedOnFailure = false;
-
TezPlanContainerNode tezPlanContainerNode;
TezOperPlan tezPlan;
int processedDAGs = 0;
@@ -257,18 +252,7 @@ public class TezLauncher extends Launche
((tezPlanContainer.size() - processedDAGs)/tezPlanContainer.size()) * 100);
}
handleUnCaughtException(pc);
- boolean tezDAGSucceeded = reporter.notifyFinishedOrFailed();
- tezPlanContainer.updatePlan(tezPlan, tezDAGSucceeded);
- // if stop_on_failure is enabled, we need to stop immediately when any job has failed
- if (!tezDAGSucceeded) {
- if (stop_on_failure) {
- stoppedOnFailure = true;
- break;
- } else {
- log.warn("Ooops! Some job has failed! Specify -stop_on_failure if you "
- + "want Pig to stop immediately on failure.");
- }
- }
+ tezPlanContainer.updatePlan(tezPlan, reporter.notifyFinishedOrFailed());
}
tezStats.finish();
@@ -295,11 +279,6 @@ public class TezLauncher extends Launche
}
}
- if (stoppedOnFailure) {
- throw new ExecException("Stopping execution on job failure with -stop_on_failure option", 6017,
- PigException.REMOTE_ENVIRONMENT);
- }
-
return tezStats;
}
@@ -423,11 +402,9 @@ public class TezLauncher extends Launche
TezCompiler comp = new TezCompiler(php, pc);
comp.compile();
TezPlanContainer planContainer = comp.getPlanContainer();
- // Doing a sort so that test plan printed remains same between jdk7 and jdk8
- List<OperatorKey> opKeys = new ArrayList<>(planContainer.getKeys().keySet());
- Collections.sort(opKeys);
- for (OperatorKey opKey : opKeys) {
- TezOperPlan tezPlan = planContainer.getOperator(opKey).getTezOperPlan();
+ for (Map.Entry<OperatorKey, TezPlanContainerNode> entry : planContainer
+ .getKeys().entrySet()) {
+ TezOperPlan tezPlan = entry.getValue().getTezOperPlan();
optimize(tezPlan, pc);
}
return planContainer;
@@ -522,7 +499,7 @@ public class TezLauncher extends Launche
@Override
public void killJob(String jobID, Configuration conf) throws BackendException {
- if (runningJob != null && runningJob.getApplicationId().toString().equals(jobID)) {
+ if (runningJob != null && runningJob.getApplicationId().toString() == jobID) {
try {
runningJob.killJob();
} catch (Exception e) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Fri Feb 24 03:34:37 2017
@@ -39,8 +39,6 @@ import org.apache.pig.PigConfiguration;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
-import com.google.common.annotations.VisibleForTesting;
-
public class TezResourceManager {
private static TezResourceManager instance = null;
private boolean inited = false;
@@ -61,7 +59,6 @@ public class TezResourceManager {
/**
* This method is only used by test code to reset state.
*/
- @VisibleForTesting
public static void dropInstance() {
instance = null;
}
@@ -69,7 +66,7 @@ public class TezResourceManager {
public void init(PigContext pigContext, Configuration conf) throws IOException {
if (!inited) {
this.resourcesDir = FileLocalizer.getTemporaryResourcePath(pigContext);
- this.remoteFs = resourcesDir.getFileSystem(conf);
+ this.remoteFs = FileSystem.get(conf);
this.conf = conf;
this.pigContext = pigContext;
this.inited = true;
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Fri Feb 24 03:34:37 2017
@@ -18,9 +18,7 @@
package org.apache.pig.backend.hadoop.executionengine.tez;
import java.io.IOException;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Calendar;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -31,11 +29,9 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.hadoop.executionengine.tez.TezJob.TezJobConfig;
import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.tez.TezScriptState;
import org.apache.tez.client.TezAppMasterStatus;
@@ -50,13 +46,13 @@ public class TezSessionManager {
private static final Log log = LogFactory.getLog(TezSessionManager.class);
static {
- Utils.addShutdownHookWithPriority(new Runnable() {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
TezSessionManager.shutdown();
}
- }, PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY);
+ });
}
private static ReentrantReadWriteLock sessionPoolLock = new ReentrantReadWriteLock();
@@ -65,17 +61,11 @@ public class TezSessionManager {
private TezSessionManager() {
}
- private static class SessionInfo {
-
- public SessionInfo(TezClient session, TezConfiguration config, Map<String, LocalResource> resources) {
+ public static class SessionInfo {
+ SessionInfo(TezClient session, Map<String, LocalResource> resources) {
this.session = session;
- this.config = config;
this.resources = resources;
}
-
- public TezConfiguration getConfig() {
- return config;
- }
public Map<String, LocalResource> getResources() {
return resources;
}
@@ -87,23 +77,20 @@ public class TezSessionManager {
}
private TezClient session;
private Map<String, LocalResource> resources;
- private TezConfiguration config;
private boolean inUse = false;
}
private static List<SessionInfo> sessionPool = new ArrayList<SessionInfo>();
- private static SessionInfo createSession(TezConfiguration amConf,
+ private static SessionInfo createSession(Configuration conf,
Map<String, LocalResource> requestedAMResources, Credentials creds,
TezJobConfig tezJobConf) throws TezException, IOException,
InterruptedException {
- MRToTezHelper.translateMRSettingsForTezAM(amConf);
+ TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
TezScriptState ss = TezScriptState.get();
ss.addDAGSettingsToConf(amConf);
- if (amConf.getBoolean(PigConfiguration.PIG_TEZ_CONFIGURE_AM_MEMORY, true)) {
- adjustAMConfig(amConf, tezJobConf);
- }
- String jobName = amConf.get(PigContext.JOB_NAME, "pig");
+ adjustAMConfig(amConf, tezJobConf);
+ String jobName = conf.get(PigContext.JOB_NAME, "pig");
TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds);
try {
tezClient.start();
@@ -117,10 +104,12 @@ public class TezSessionManager {
tezClient.stop();
throw new RuntimeException(e);
}
- return new SessionInfo(tezClient, amConf, requestedAMResources);
+ return new SessionInfo(tezClient, requestedAMResources);
}
private static void adjustAMConfig(TezConfiguration amConf, TezJobConfig tezJobConf) {
+ int requiredAMMaxHeap = -1;
+ int requiredAMResourceMB = -1;
String amLaunchOpts = amConf.get(
TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT);
@@ -133,10 +122,8 @@ public class TezSessionManager {
// Need more room for native memory/virtual address space
// when close to 4G due to 32-bit jvm 4G limit
- int maxAMHeap = Utils.is64bitJVM() ? 3584 : 3200;
- int maxAMResourceMB = 4096;
- int requiredAMResourceMB = maxAMResourceMB;
- int requiredAMMaxHeap = maxAMHeap;
+ int minAMMaxHeap = 3200;
+ int minAMResourceMB = 4096;
// Rough estimation. For 5K tasks 1G Xmx and 1.5G resource.mb
// Increment container size by 512 mb for every additional 5K tasks.
@@ -148,38 +135,22 @@ public class TezSessionManager {
// 5000 and above - 1024Xmx, 1536 (512 native memory)
for (int taskCount = 30000; taskCount >= 5000; taskCount-=5000) {
if (tezJobConf.getEstimatedTotalParallelism() >= taskCount) {
+ requiredAMMaxHeap = minAMMaxHeap;
+ requiredAMResourceMB = minAMResourceMB;
break;
}
- requiredAMResourceMB = requiredAMResourceMB - 512;
- requiredAMMaxHeap = requiredAMResourceMB - 512;
- }
-
- if (tezJobConf.getTotalVertices() > 30) {
- //Add 512 mb per 30 vertices
- int additionaMem = 512 * (tezJobConf.getTotalVertices() / 30);
- requiredAMResourceMB = requiredAMResourceMB + additionaMem;
- requiredAMMaxHeap = requiredAMResourceMB - 512;
- }
-
- if (tezJobConf.getMaxOutputsinSingleVertex() > 10) {
- //Add 256 mb per 5 outputs if a vertex has more than 10 outputs
- int additionaMem = 256 * (tezJobConf.getMaxOutputsinSingleVertex() / 5);
- requiredAMResourceMB = requiredAMResourceMB + additionaMem;
- requiredAMMaxHeap = requiredAMResourceMB - 512;
+ minAMResourceMB = minAMResourceMB - 512;
+ minAMMaxHeap = minAMResourceMB - 512;
}
- requiredAMResourceMB = Math.min(maxAMResourceMB, requiredAMResourceMB);
- requiredAMMaxHeap = Math.min(maxAMHeap, requiredAMMaxHeap);
-
if (requiredAMResourceMB > -1 && configuredAMResourceMB < requiredAMResourceMB) {
amConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, requiredAMResourceMB);
log.info("Increasing "
+ TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB + " from "
+ configuredAMResourceMB + " to "
+ requiredAMResourceMB
- + " as total estimated tasks = " + tezJobConf.getEstimatedTotalParallelism()
- + ", total vertices = " + tezJobConf.getTotalVertices()
- + ", max outputs = " + tezJobConf.getMaxOutputsinSingleVertex());
+ + " as the number of total estimated tasks is "
+ + tezJobConf.getEstimatedTotalParallelism());
if (requiredAMMaxHeap > -1 && configuredAMMaxHeap < requiredAMMaxHeap) {
amConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
@@ -187,9 +158,8 @@ public class TezSessionManager {
log.info("Increasing Tez AM Heap Size from "
+ configuredAMMaxHeap + "M to "
+ requiredAMMaxHeap
- + "M as total estimated tasks = " + tezJobConf.getEstimatedTotalParallelism()
- + ", total vertices = " + tezJobConf.getTotalVertices()
- + ", max outputs = " + tezJobConf.getMaxOutputsinSingleVertex());
+ + "M as the number of total estimated tasks is "
+ + tezJobConf.getEstimatedTotalParallelism());
log.info("Value of " + TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS + " is now "
+ amConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS));
}
@@ -208,22 +178,7 @@ public class TezSessionManager {
return true;
}
- private static boolean validateSessionConfig(SessionInfo currentSession,
- Configuration newSessionConfig)
- throws TezException, IOException {
- // If DAG recovery is disabled for one and enabled for another, do not reuse
- if (currentSession.getConfig().getBoolean(
- TezConfiguration.DAG_RECOVERY_ENABLED,
- TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)
- != newSessionConfig.getBoolean(
- TezConfiguration.DAG_RECOVERY_ENABLED,
- TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
- return false;
- }
- return true;
- }
-
- static TezClient getClient(TezConfiguration conf, Map<String, LocalResource> requestedAMResources,
+ static TezClient getClient(Configuration conf, Map<String, LocalResource> requestedAMResources,
Credentials creds, TezJobConfig tezJobConf) throws TezException, IOException, InterruptedException {
List<SessionInfo> sessionsToRemove = new ArrayList<SessionInfo>();
SessionInfo newSession = null;
@@ -241,8 +196,7 @@ public class TezSessionManager {
sessionsToRemove.add(sessionInfo);
} else if (!sessionInfo.inUse
&& appMasterStatus.equals(TezAppMasterStatus.READY)
- && validateSessionResources(sessionInfo,requestedAMResources)
- && validateSessionConfig(sessionInfo, conf)) {
+ && validateSessionResources(sessionInfo,requestedAMResources)) {
sessionInfo.inUse = true;
return sessionInfo.session;
}
@@ -299,11 +253,6 @@ public class TezSessionManager {
synchronized (sessionInfo) {
if (sessionInfo.session == session) {
log.info("Stopping Tez session " + session);
- String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
- .format(Calendar.getInstance().getTime());
- System.err.println(timeStamp + " Shutting down Tez session "
- + ", sessionName=" + session.getClientName()
- + ", applicationId=" + session.getAppMasterApplicationId());
session.stop();
sessionToRemove = sessionInfo;
break;
@@ -330,30 +279,19 @@ public class TezSessionManager {
shutdown = true;
for (SessionInfo sessionInfo : sessionPool) {
synchronized (sessionInfo) {
- TezClient session = sessionInfo.session;
try {
- String timeStamp = new SimpleDateFormat(
- "yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
- if (session.getAppMasterStatus().equals(
+ if (sessionInfo.session.getAppMasterStatus().equals(
TezAppMasterStatus.SHUTDOWN)) {
log.info("Tez session is already shutdown "
- + session);
- System.err.println(timeStamp
- + " Tez session is already shutdown " + session
- + ", sessionName=" + session.getClientName()
- + ", applicationId=" + session.getAppMasterApplicationId());
+ + sessionInfo.session);
continue;
}
- log.info("Shutting down Tez session " + session);
- // Since hadoop calls org.apache.log4j.LogManager.shutdown();
- // the log.info message is not displayed with shutdown hook in Oozie
- System.err.println(timeStamp + " Shutting down Tez session "
- + ", sessionName=" + session.getClientName()
- + ", applicationId=" + session.getAppMasterApplicationId());
- session.stop();
+ log.info("Shutting down Tez session "
+ + sessionInfo.session);
+ sessionInfo.session.stop();
} catch (Exception e) {
log.error("Error shutting down Tez session "
- + session, e);
+ + sessionInfo.session, e);
}
}
}