You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/22 09:43:46 UTC
svn commit: r1783988 [9/24] - in /pig/branches/spark: ./ bin/ conf/
contrib/piggybank/java/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Wed Feb 22 09:43:41 2017
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
@@ -89,6 +90,8 @@ public class POCast extends ExpressionOp
caster = ((LoadFunc)obj).getLoadCaster();
} else if (obj instanceof StreamToPig) {
caster = ((StreamToPig)obj).getLoadCaster();
+ } else if (obj instanceof EvalFunc) {
+ caster = ((EvalFunc)obj).getLoadCaster();
} else {
throw new IOException("Invalid class type "
+ funcSpec.getClassName());
@@ -165,7 +168,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToBigInteger(dba.get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "BigInteger.";
+ String msg = unknownByteArrayErrorMessage + "BigInteger for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -281,7 +284,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToBigDecimal(dba.get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "BigDecimal.";
+ String msg = unknownByteArrayErrorMessage + "BigDecimal for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -396,7 +399,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToBoolean(dba.get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "boolean.";
+ String msg = unknownByteArrayErrorMessage + "boolean for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -510,7 +513,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToInteger(dba.get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "int.";
+ String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -636,7 +639,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToLong(dba.get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "long.";
+ String msg = unknownByteArrayErrorMessage + "long for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -759,7 +762,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToDouble(dba.get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "double.";
+ String msg = unknownByteArrayErrorMessage + "double for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -881,7 +884,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToFloat(dba.get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "float.";
+ String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -1007,7 +1010,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToDateTime(dba.get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "datetime.";
+ String msg = unknownByteArrayErrorMessage + "datetime for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -1118,7 +1121,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToCharArray(dba.get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "string.";
+ String msg = unknownByteArrayErrorMessage + "string for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -1270,7 +1273,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToTuple(dba.get(), fieldSchema);
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "tuple.";
+ String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -1332,7 +1335,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToBag(((DataByteArray)obj).get(), fs);
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "bag.";
+ String msg = unknownByteArrayErrorMessage + "bag for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
} else {
@@ -1363,7 +1366,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToTuple(((DataByteArray)obj).get(), fs);
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "tuple.";
+ String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
} else {
@@ -1388,7 +1391,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToMap(((DataByteArray)obj).get(), fs);
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "tuple.";
+ String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
} else {
@@ -1402,7 +1405,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToBoolean(((DataByteArray) obj).get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "int.";
+ String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
break;
@@ -1441,7 +1444,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToInteger(((DataByteArray) obj).get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "int.";
+ String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
break;
@@ -1487,7 +1490,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToDouble(((DataByteArray) obj).get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "double.";
+ String msg = unknownByteArrayErrorMessage + "double for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
break;
@@ -1533,7 +1536,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToLong(((DataByteArray)obj).get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "long.";
+ String msg = unknownByteArrayErrorMessage + "long for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
break;
@@ -1579,7 +1582,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToFloat(((DataByteArray)obj).get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "float.";
+ String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
break;
@@ -1625,7 +1628,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToDateTime(((DataByteArray)obj).get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "datetime.";
+ String msg = unknownByteArrayErrorMessage + "datetime for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
break;
@@ -1664,7 +1667,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToCharArray(((DataByteArray)obj).get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "float.";
+ String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
break;
@@ -1712,7 +1715,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToBigInteger(((DataByteArray)obj).get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "BigInteger.";
+ String msg = unknownByteArrayErrorMessage + "BigInteger for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
break;
@@ -1757,7 +1760,7 @@ public class POCast extends ExpressionOp
result = caster.bytesToBigDecimal(((DataByteArray)obj).get());
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "BigDecimal.";
+ String msg = unknownByteArrayErrorMessage + "BigDecimal for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
break;
@@ -1795,6 +1798,10 @@ public class POCast extends ExpressionOp
default:
throw new ExecException("Cannot convert "+ obj + " to " + fs, 1120, PigException.INPUT);
}
+ case DataType.BYTEARRAY:
+ //no-op (PIG-4933)
+ result = obj;
+ break;
default:
throw new ExecException("Don't know how to convert "+ obj + " to " + fs, 1120, PigException.INPUT);
}
@@ -1861,7 +1868,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToBag(dba.get(), fieldSchema);
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "bag.";
+ String msg = unknownByteArrayErrorMessage + "bag for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
@@ -1952,7 +1959,7 @@ public class POCast extends ExpressionOp
res.result = caster.bytesToMap(dba.get(), fieldSchema);
} else {
int errCode = 1075;
- String msg = unknownByteArrayErrorMessage + "map.";
+ String msg = unknownByteArrayErrorMessage + "map for " + this.getOriginalLocations();
throw new ExecException(msg, errCode, PigException.INPUT);
}
} catch (ExecException ee) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Wed Feb 22 09:43:41 2017
@@ -158,23 +158,19 @@ public class POProject extends Expressio
illustratorMarkup(inpValue, res.result, -1);
return res;
} else if(columns.size() == 1) {
- try {
+ if ( inpValue == null ) {
+ // the tuple is null, so a dereference should also produce a null
+ res.returnStatus = POStatus.STATUS_OK;
+ ret = null;
+ } else if( inpValue.size() > columns.get(0) ) {
ret = inpValue.get(columns.get(0));
- } catch (IndexOutOfBoundsException ie) {
+ } else {
if(pigLogger != null) {
pigLogger.warn(this,"Attempt to access field " +
"which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD);
}
res.returnStatus = POStatus.STATUS_OK;
ret = null;
- } catch (NullPointerException npe) {
- // the tuple is null, so a dereference should also produce a null
- // there is a slight danger here that the Tuple implementation
- // may have given the exception for a different reason but if we
- // don't catch it, we will die and the most common case for the
- // exception would be because the tuple is null
- res.returnStatus = POStatus.STATUS_OK;
- ret = null;
}
} else if(isProjectToEnd){
ret = getRangeTuple(inpValue);
@@ -215,23 +211,18 @@ public class POProject extends Expressio
*/
private void addColumn(ArrayList<Object> objList, Tuple inpValue, int i)
throws ExecException {
- try {
+ if( inpValue == null ) {
+ // the tuple is null, so a dereference should also produce a null
+ objList.add(null);
+ } else if( inpValue.size() > i ) {
objList.add(inpValue.get(i));
- } catch (IndexOutOfBoundsException ie) {
+ } else {
if(pigLogger != null) {
pigLogger.warn(this,"Attempt to access field " + i +
" which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD);
}
objList.add(null);
}
- catch (NullPointerException npe) {
- // the tuple is null, so a dereference should also produce a null
- // there is a slight danger here that the Tuple implementation
- // may have given the exception for a different reason but if we
- // don't catch it, we will die and the most common case for the
- // exception would be because the tuple is null
- objList.add(null);
- }
}
@Override
@@ -406,21 +397,17 @@ public class POProject extends Expressio
Object ret;
if(columns.size() == 1) {
- try{
+ if( inpValue == null ) {
+ // the tuple is null, so a dereference should also produce a null
+ ret = null;
+ } else if( inpValue.size() > columns.get(0) ) {
ret = inpValue.get(columns.get(0));
- } catch (IndexOutOfBoundsException ie) {
+ } else {
if(pigLogger != null) {
pigLogger.warn(this,"Attempt to access field " +
"which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD);
}
ret = null;
- } catch (NullPointerException npe) {
- // the tuple is null, so a dereference should also produce a null
- // there is a slight danger here that the Tuple implementation
- // may have given the exception for a different reason but if we
- // don't catch it, we will die and the most common case for the
- // exception would be because the tuple is null
- ret = null;
}
} else if(isProjectToEnd) {
ret = getRangeTuple(inpValue);
@@ -428,21 +415,17 @@ public class POProject extends Expressio
ArrayList<Object> objList = new ArrayList<Object>(columns.size());
for(int col: columns) {
- try {
+ if( inpValue == null ) {
+ // the tuple is null, so a dereference should also produce a null
+ objList.add(null);
+ } else if( inpValue.size() > col ) {
objList.add(inpValue.get(col));
- } catch (IndexOutOfBoundsException ie) {
+ } else {
if(pigLogger != null) {
pigLogger.warn(this,"Attempt to access field " +
"which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD);
}
objList.add(null);
- } catch (NullPointerException npe) {
- // the tuple is null, so a dereference should also produce a null
- // there is a slight danger here that the Tuple implementation
- // may have given the exception for a different reason but if we
- // don't catch it, we will die and the most common case for the
- // exception would be because the tuple is null
- objList.add(null);
}
}
ret = mTupleFactory.newTuple(objList);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Wed Feb 22 09:43:41 2017
@@ -49,7 +49,7 @@ public class CombinerPackager extends Pa
private Map<Integer, Integer> keyLookup;
private int numBags;
-
+
private transient boolean initialized;
private transient boolean useDefaultBag;
@@ -77,6 +77,15 @@ public class CombinerPackager extends Pa
}
}
+ @Override
+ public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+ throws ExecException {
+ this.key = key;
+ this.bags = bags;
+ this.readOnce = readOnce;
+ // Bag can be read directly and need not be materialized again
+ }
+
/**
* @param keyInfo the keyInfo to set
*/
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java Wed Feb 22 09:43:41 2017
@@ -17,7 +17,7 @@
*/
/**
- *
+ *
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
@@ -48,6 +49,15 @@ public class LitePackager extends Packag
private PigNullableWritable keyWritable;
@Override
+ public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+ throws ExecException {
+ this.key = key;
+ this.bags = bags;
+ this.readOnce = readOnce;
+ // Bag can be read directly and need not be materialized again
+ }
+
+ @Override
public boolean[] getInner() {
return null;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java Wed Feb 22 09:43:41 2017
@@ -256,4 +256,9 @@ public class POCross extends PhysicalOpe
data = null;
}
+ @Override
+ public void reset() {
+ clearMemory();
+ }
+
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Wed Feb 22 09:43:41 2017
@@ -97,7 +97,7 @@ public class POFRJoin extends PhysicalOp
// The array of Hashtables one per replicated input. replicates[fragment] =
// null fragment is the input which is fragmented and not replicated.
- protected transient TupleToMapKey replicates[];
+ protected transient List<Map<? extends Object, ? extends List<Tuple>>> replicates;
// varaible which denotes whether we are returning tuples from the foreach
// operator
protected transient boolean processingPlan;
@@ -234,7 +234,10 @@ public class POFRJoin extends PhysicalOp
Result res = null;
Result inp = null;
if (!setUp) {
- replicates = new TupleToMapKey[phyPlanLists.size()];
+ replicates = new ArrayList<Map<? extends Object, ? extends List<Tuple>>>(phyPlanLists.size());
+ for (int i = 0 ; i < phyPlanLists.size(); i++) {
+ replicates.add(null);
+ }
dumTup = mTupleFactory.newTuple(1);
setUpHashMap();
setUp = true;
@@ -282,8 +285,7 @@ public class POFRJoin extends PhysicalOp
return new Result();
}
Tuple lrOutTuple = (Tuple) lrOut.result;
- Tuple key = mTupleFactory.newTuple(1);
- key.set(0, lrOutTuple.get(1));
+ Object key = lrOutTuple.get(1);
Tuple value = getValueTuple(lr, lrOutTuple);
lr.detachInput();
// Configure the for each operator with the relevant bags
@@ -296,7 +298,7 @@ public class POFRJoin extends PhysicalOp
ce.setValue(value);
continue;
}
- TupleToMapKey replicate = replicates[i];
+ Map<? extends Object, ? extends List<Tuple>> replicate = replicates.get(i);
if (replicate.get(key) == null) {
if (isLeftOuterJoin) {
ce.setValue(nullBag);
@@ -304,7 +306,7 @@ public class POFRJoin extends PhysicalOp
noMatch = true;
break;
}
- ce.setValue(new NonSpillableDataBag(replicate.get(key).getList()));
+ ce.setValue(new NonSpillableDataBag(replicate.get(key)));
}
// If this is not LeftOuter Join and there was no match we
@@ -327,27 +329,28 @@ public class POFRJoin extends PhysicalOp
}
}
- protected static class TupleToMapKey {
- private HashMap<Tuple, TuplesToSchemaTupleList> tuples;
+ protected static class TupleToMapKey extends HashMap<Object, ArrayList<Tuple>> {
private SchemaTupleFactory tf;
public TupleToMapKey(int ct, SchemaTupleFactory tf) {
- tuples = new HashMap<Tuple, TuplesToSchemaTupleList>(ct);
+ super(ct);
this.tf = tf;
}
- public TuplesToSchemaTupleList put(Tuple key, TuplesToSchemaTupleList val) {
- if (tf != null) {
- key = TuplesToSchemaTupleList.convert(key, tf);
+ @Override
+ public TuplesToSchemaTupleList put(Object key, ArrayList<Tuple> val) {
+ if (tf != null && key instanceof Tuple) {
+ key = TuplesToSchemaTupleList.convert((Tuple)key, tf);
}
- return tuples.put(key, val);
+ return (TuplesToSchemaTupleList) super.put(key, val);
}
- public TuplesToSchemaTupleList get(Tuple key) {
- if (tf != null) {
- key = TuplesToSchemaTupleList.convert(key, tf);
+ @Override
+ public TuplesToSchemaTupleList get(Object key) {
+ if (tf != null && key instanceof Tuple) {
+ key = TuplesToSchemaTupleList.convert((Tuple)key, tf);
}
- return tuples.get(key);
+ return (TuplesToSchemaTupleList) super.get(key);
}
}
@@ -382,7 +385,7 @@ public class POFRJoin extends PhysicalOp
SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i];
if (i == fragment) {
- replicates[i] = null;
+ replicates.set(i, null);
continue;
}
@@ -401,25 +404,34 @@ public class POFRJoin extends PhysicalOp
POLocalRearrange lr = LRs[i];
lr.setInputs(Arrays.asList((PhysicalOperator) ld));
- TupleToMapKey replicate = new TupleToMapKey(1000, keySchemaTupleFactory);
+ Map<Object, ArrayList<Tuple>> replicate;
+ if (keySchemaTupleFactory == null) {
+ replicate = new HashMap<Object, ArrayList<Tuple>>(1000);
+ } else {
+ replicate = new TupleToMapKey(1000, keySchemaTupleFactory);
+ }
log.debug("Completed setup. Trying to build replication hash table");
for (Result res = lr.getNextTuple(); res.returnStatus != POStatus.STATUS_EOP; res = lr.getNextTuple()) {
if (getReporter() != null)
getReporter().progress();
Tuple tuple = (Tuple) res.result;
- if (isKeyNull(tuple.get(1))) continue;
- Tuple key = mTupleFactory.newTuple(1);
- key.set(0, tuple.get(1));
+ Object key = tuple.get(1);
+ if (isKeyNull(key)) continue;
Tuple value = getValueTuple(lr, tuple);
- if (replicate.get(key) == null) {
- replicate.put(key, new TuplesToSchemaTupleList(1, inputSchemaTupleFactory));
+ ArrayList<Tuple> values = replicate.get(key);
+ if (values == null) {
+ if (inputSchemaTupleFactory == null) {
+ values = new ArrayList<Tuple>(1);
+ } else {
+ values = new TuplesToSchemaTupleList(1, inputSchemaTupleFactory);
+ }
+ replicate.put(key, values);
}
-
- replicate.get(key).add(value);
+ values.add(value);
}
- replicates[i] = replicate;
+ replicates.set(i, replicate);
}
long time2 = System.currentTimeMillis();
log.debug("Hash Table built. Time taken: " + (time2 - time1));
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java Wed Feb 22 09:43:41 2017
@@ -51,7 +51,7 @@ public class POFRJoinSpark extends POFRJ
addSchemaToFactories(keySchemas[i], keySchemaTupleFactories, i);
}
- replicates[fragment] = null;
+ replicates.set(fragment, null);
int i = -1;
long start = System.currentTimeMillis();
for (int k = 0; k < inputSchemas.length; ++k) {
@@ -61,7 +61,7 @@ public class POFRJoinSpark extends POFRJ
SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i];
if (i == fragment) {
- replicates[i] = null;
+ replicates.set(fragment, null);
continue;
}
@@ -91,7 +91,7 @@ public class POFRJoinSpark extends POFRJ
replicate.get(key).add(value);
}
- replicates[i] = replicate;
+ replicates.set(i, replicate);
}
long end = System.currentTimeMillis();
log.debug("Hash Table built. Time taken: " + (end - start));
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Wed Feb 22 09:43:41 2017
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -55,6 +56,7 @@ import org.apache.pig.pen.util.LineageTr
@SuppressWarnings("unchecked")
public class POForEach extends PhysicalOperator {
private static final long serialVersionUID = 1L;
+ private static final Result UNLIMITED_NULL_RESULT = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple());
protected List<PhysicalPlan> inputPlans;
@@ -264,7 +266,7 @@ public class POForEach extends PhysicalO
if (inp.returnStatus == POStatus.STATUS_EOP) {
if (parentPlan!=null && parentPlan.endOfAllInput && !endOfAllInputProcessed && endOfAllInputProcessing) {
// continue pull one more output
- inp = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple());
+ inp = UNLIMITED_NULL_RESULT;
} else {
return inp;
}
@@ -441,6 +443,8 @@ public class POForEach extends PhysicalO
if(inputData.result instanceof DataBag && isToBeFlattenedArray[i]) {
its[i] = ((DataBag)bags[i]).iterator();
+ } else if (inputData.result instanceof Map && isToBeFlattenedArray[i]) {
+ its[i] = ((Map)bags[i]).entrySet().iterator();
} else {
its[i] = null;
}
@@ -466,7 +470,7 @@ public class POForEach extends PhysicalO
//we instantiate the template array and start populating it with data
data = new Object[noItems];
for(int i = 0; i < noItems; ++i) {
- if(isToBeFlattenedArray[i] && bags[i] instanceof DataBag) {
+ if(isToBeFlattenedArray[i] && (bags[i] instanceof DataBag || bags[i] instanceof Map)) {
if(its[i].hasNext()) {
data[i] = its[i].next();
} else {
@@ -540,6 +544,15 @@ public class POForEach extends PhysicalO
out.append(t.get(j));
}
}
+ } else if (isToBeFlattenedArray[i] && in instanceof Map.Entry) {
+ Map.Entry entry = (Map.Entry)in;
+ if (knownSize) {
+ out.set(idx++, entry.getKey());
+ out.set(idx++, entry.getValue());
+ } else {
+ out.append(entry.getKey());
+ out.append(entry.getValue());
+ }
} else {
if (knownSize) {
out.set(idx++, in);
@@ -738,9 +751,12 @@ public class POForEach extends PhysicalO
opsToBeReset.add(sort);
}
- /* (non-Javadoc)
- * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject)
- */
+ @Override
+ public void visitCross(POCross c) throws VisitorException {
+ // FIXME: add only if limit is present
+ opsToBeReset.add(c);
+ }
+
@Override
public void visitProject(POProject proj) throws VisitorException {
if(proj instanceof PORelationToExprProject) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Wed Feb 22 09:43:41 2017
@@ -56,11 +56,11 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.newplan.logical.relational.LOJoin;
-/** This operator implements merge join algorithm to do map side joins.
+/** This operator implements merge join algorithm to do map side joins.
* Currently, only two-way joins are supported. One input of join is identified as left
* and other is identified as right. Left input tuples are the input records in map.
* Right tuples are read from HDFS by opening right stream.
- *
+ *
* This join doesn't support outer join.
* Data is assumed to be sorted in ascending order. It will fail if data is sorted in descending order.
*/
@@ -99,7 +99,7 @@ public class POMergeJoin extends Physica
private FuncSpec rightLoaderFuncSpec;
private String rightInputFileName;
-
+
private String indexFile;
// Buffer to hold accumulated left tuples.
@@ -249,12 +249,11 @@ public class POMergeJoin extends Physica
* from Tuple to SchemaTuple. This is necessary because we are not getting SchemaTuples
* from the source, though in the future that is what we would like to do.
*/
- public static class TuplesToSchemaTupleList {
- private List<Tuple> tuples;
+ public static class TuplesToSchemaTupleList extends ArrayList<Tuple> {
private SchemaTupleFactory tf;
public TuplesToSchemaTupleList(int ct, TupleMaker<?> tf) {
- tuples = new ArrayList<Tuple>(ct);
+ super(ct);
if (tf instanceof SchemaTupleFactory) {
this.tf = (SchemaTupleFactory)tf;
}
@@ -273,24 +272,24 @@ public class POMergeJoin extends Physica
}
}
+ @Override
public boolean add(Tuple t) {
if (tf != null) {
t = convert(t, tf);
}
- return tuples.add(t);
+ return super.add(t);
}
+ @Override
public Tuple get(int i) {
- return tuples.get(i);
+ return super.get(i);
}
+ @Override
public int size() {
- return tuples.size();
+ return super.size();
}
- public List<Tuple> getList() {
- return tuples;
- }
}
@SuppressWarnings("unchecked")
@@ -357,7 +356,7 @@ public class POMergeJoin extends Physica
}
else{
Object rightKey = extractKeysFromTuple(rightInp, 1);
- if(null == rightKey) // If we see tuple having null keys in stream, we drop them
+ if(null == rightKey) // If we see tuple having null keys in stream, we drop them
continue; // and fetch next tuple.
int cmpval = ((Comparable)rightKey).compareTo(curJoinKey);
@@ -399,7 +398,7 @@ public class POMergeJoin extends Physica
"Last two tuples encountered were: \n"+
curJoiningRightTup+ "\n" + (Tuple)rightInp.result ;
throw new ExecException(errMsg,errCode);
- }
+ }
}
}
}
@@ -430,17 +429,17 @@ public class POMergeJoin extends Physica
prevLeftKey+ "\n" + curLeftKey ;
throw new ExecException(errMsg,errCode);
}
-
+
case POStatus.STATUS_EOP:
if(this.parentPlan.endOfAllInput || isEndOfInput()){
- // We hit the end on left input.
+ // We hit the end on left input.
// Tuples in bag may still possibly join with right side.
curJoinKey = prevLeftKey;
curLeftKey = null;
if (isEndOfInput()) {
leftInputConsumedInSpark = true;
}
- break;
+ break;
}
else // Fetch next left input.
return curLeftInp;
@@ -465,7 +464,7 @@ public class POMergeJoin extends Physica
// Accumulated tuples with same key on left side.
// But since we are reading ahead we still haven't checked the read ahead right tuple.
// Accumulated left tuples may potentially join with that. So, lets check that first.
-
+
if((null != prevRightKey) && prevRightKey.equals(prevLeftKey)){
curJoiningRightTup = (Tuple)prevRightInp.result;
@@ -487,17 +486,17 @@ public class POMergeJoin extends Physica
slidingToNextRecord = false;
} else
rightInp = getNextRightInp(prevLeftKey);
-
+
if(rightInp.returnStatus != POStatus.STATUS_OK)
return rightInp;
Object extractedRightKey = extractKeysFromTuple(rightInp, 1);
-
- if(null == extractedRightKey) // If we see tuple having null keys in stream, we drop them
+
+ if(null == extractedRightKey) // If we see tuple having null keys in stream, we drop them
continue; // and fetch next tuple.
-
+
Comparable rightKey = (Comparable)extractedRightKey;
-
+
if( prevRightKey != null && rightKey.compareTo(prevRightKey) < 0){
// Sanity check.
int errCode = 1102;
@@ -528,7 +527,7 @@ public class POMergeJoin extends Physica
else{ // We got ahead on right side. Store currently read right tuple.
prevRightKey = rightKey;
prevRightInp = rightInp;
- // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call.
+ // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call.
leftTuples = newLeftTupleArray();
leftTuples.add((Tuple)curLeftInp.result);
prevLeftInp = curLeftInp;
@@ -555,7 +554,7 @@ public class POMergeJoin extends Physica
DefaultIndexableLoader loader = (DefaultIndexableLoader)rightLoader;
loader.setIndexFile(indexFile);
}
-
+
// Pass signature of the loader to rightLoader
// make a copy of the conf to use in calls to rightLoader.
rightLoader.setUDFContextSignature(signature);
@@ -608,11 +607,11 @@ public class POMergeJoin extends Physica
// run the tuple through the pipeline
rightPipelineRoot.attachInput(t);
return this.getNextRightInp();
-
+
}
default: // We don't deal with ERR/NULL. just pass them down
throwProcessingException(false, null);
-
+
}
}
} catch (IOException e) {
@@ -643,8 +642,8 @@ public class POMergeJoin extends Physica
int errCode = 2167;
String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly";
throw new ExecException(errMsg,errCode,PigException.BUG);
- }
-
+ }
+
return ((Tuple) lrOut.result).get(1);
}
@@ -660,7 +659,7 @@ public class POMergeJoin extends Physica
noInnerPlanOnRightSide = false;
this.rightPipelineLeaf = rightPipeline.getLeaves().get(0);
this.rightPipelineRoot = rightPipeline.getRoots().get(0);
- this.rightPipelineRoot.setInputs(null);
+ this.rightPipelineRoot.setInputs(null);
}
else
noInnerPlanOnRightSide = true;
@@ -711,18 +710,18 @@ public class POMergeJoin extends Physica
public boolean supportsMultipleOutputs() {
return false;
}
-
+
/**
* @param rightInputFileName the rightInputFileName to set
*/
public void setRightInputFileName(String rightInputFileName) {
this.rightInputFileName = rightInputFileName;
}
-
+
public String getSignature() {
return signature;
}
-
+
public void setSignature(String signature) {
this.signature = signature;
}
@@ -734,12 +733,12 @@ public class POMergeJoin extends Physica
public String getIndexFile() {
return indexFile;
}
-
+
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
return null;
}
-
+
public LOJoin.JOINTYPE getJoinType() {
return joinType;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java Wed Feb 22 09:43:41 2017
@@ -44,6 +44,9 @@ public class POPoissonSample extends Phy
private transient boolean initialized;
+ // num of rows skipped so far
+ private transient int numSkipped;
+
// num of rows sampled so far
private transient int numRowsSampled;
@@ -89,6 +92,7 @@ public class POPoissonSample extends Phy
@Override
public Result getNextTuple() throws ExecException {
if (!initialized) {
+ numSkipped = 0;
numRowsSampled = 0;
avgTupleMemSz = 0;
rowNum = 0;
@@ -134,7 +138,7 @@ public class POPoissonSample extends Phy
}
// skip tuples
- for (long numSkipped = 0; numSkipped < skipInterval; numSkipped++) {
+ while (numSkipped < skipInterval) {
res = processInput();
if (res.returnStatus == POStatus.STATUS_NULL) {
continue;
@@ -148,6 +152,7 @@ public class POPoissonSample extends Phy
return res;
}
rowNum++;
+ numSkipped++;
}
// skipped enough, get new sample
@@ -173,6 +178,8 @@ public class POPoissonSample extends Phy
rowNum++;
newSample = res;
+ // reset skipped
+ numSkipped = 0;
return currentSample;
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Wed Feb 22 09:43:41 2017
@@ -125,7 +125,7 @@ public class POReservoirSample extends P
}
// collect samples until input is exhausted
- int rand = randGen.nextInt(rowProcessed);
+ int rand = randGen.nextInt(rowProcessed + 1);
if (rand < numSamples) {
samples[rand] = res;
}
@@ -133,8 +133,13 @@ public class POReservoirSample extends P
}
}
- if (this.parentPlan.endOfAllInput && res.returnStatus == POStatus.STATUS_EOP) {
- sampleCollectionDone = true;
+ if (res.returnStatus == POStatus.STATUS_EOP) {
+ if (this.parentPlan.endOfAllInput) {
+ sampleCollectionDone = true;
+ } else {
+ // In case of Split can get EOP in between.
+ return res;
+ }
}
return getSample();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Wed Feb 22 09:43:41 2017
@@ -51,13 +51,13 @@ public class Packager implements Illustr
protected DataBag[] bags;
public static enum PackageType {
- GROUP, JOIN
+ GROUP, JOIN, BLOOMJOIN
};
protected transient Illustrator illustrator = null;
// The key being worked on
- Object key;
+ protected Object key;
// marker to indicate if key is a tuple
protected boolean isKeyTuple = false;
@@ -65,7 +65,7 @@ public class Packager implements Illustr
protected boolean isKeyCompound = false;
// key's type
- byte keyType;
+ protected byte keyType;
// The number of inputs to this
// co-group. 0 indicates a distinct, which means there will only be a
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java Wed Feb 22 09:43:41 2017
@@ -60,7 +60,7 @@ public class StoreFuncDecorator {
private boolean allowErrors() {
return UDFContext.getUDFContext().getJobConf()
- .getBoolean(PigConfiguration.PIG_ALLOW_STORE_ERRORS, false);
+ .getBoolean(PigConfiguration.PIG_ERROR_HANDLING_ENABLED, false);
}
/**
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Wed Feb 22 09:43:41 2017
@@ -162,8 +162,13 @@ public class LoadConverter implements RD
private SparkEngineConf sparkEngineConf;
private boolean initialized;
+ //LoadConverter#ToTupleFunction is executed more than once in multiquery case causing
+ //invalid number of input records, 'skip' flag below indicates first load is finished.
+ private boolean skip;
+
public ToTupleFunction(SparkEngineConf sparkEngineConf){
this.sparkEngineConf = sparkEngineConf;
+
}
@Override
@@ -172,9 +177,14 @@ public class LoadConverter implements RD
long partitionId = TaskContext.get().partitionId();
PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, Long.toString(partitionId));
+ //We're in POSplit and already counted all input records,
+ //in a multiquery case skip will be set to true after the first load is finished:
+ if (sparkCounters != null && SparkPigStatusReporter.getInstance().getCounters().getCounter(counterGroupName, counterName).getValue() > 0) {
+ skip=true;
+ }
initialized = true;
}
- if (sparkCounters != null && disableCounter == false) {
+ if (sparkCounters != null && disableCounter == false && skip == false) {
sparkCounters.increment(counterGroupName, counterName, 1L);
}
return v1._2();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Feb 22 09:43:41 2017
@@ -19,13 +19,14 @@
package org.apache.pig.backend.hadoop.executionengine.tez;
import java.io.IOException;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
@@ -43,6 +44,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -56,6 +58,7 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator;
@@ -87,7 +90,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
@@ -108,7 +110,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.PigImplConstants;
-import org.apache.pig.impl.builtin.DefaultIndexableLoader;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
@@ -174,6 +175,7 @@ public class TezDagBuilder extends TezOp
private PigContext pc;
private Configuration globalConf;
private Configuration pigContextConf;
+ private Configuration shuffleVertexManagerBaseConf;
private FileSystem fs;
private long intermediateTaskInputSize;
private Set<String> inputSplitInDiskVertices;
@@ -191,6 +193,8 @@ public class TezDagBuilder extends TezOp
private String mapTaskLaunchCmdOpts;
private String reduceTaskLaunchCmdOpts;
+ private boolean disableDAGRecovery = false;
+
public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
Map<String, LocalResource> localResources) {
super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
@@ -210,6 +214,10 @@ public class TezDagBuilder extends TezOp
}
}
+ public boolean shouldDisableDAGRecovery() {
+ return disableDAGRecovery;
+ }
+
private void initialize(PigContext pc) throws IOException {
this.globalConf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
@@ -217,6 +225,16 @@ public class TezDagBuilder extends TezOp
this.pigContextConf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
MRToTezHelper.processMRSettings(pigContextConf, globalConf);
+ shuffleVertexManagerBaseConf = new Configuration(false);
+ // Only copy tez.shuffle-vertex-manager config to keep payload size small
+ Iterator<Entry<String, String>> iter = pigContextConf.iterator();
+ while (iter.hasNext()) {
+ Entry<String, String> entry = iter.next();
+ if (entry.getKey().startsWith("tez.shuffle-vertex-manager")) {
+ shuffleVertexManagerBaseConf.set(entry.getKey(), entry.getValue());
+ }
+ }
+
// Add credentials from binary token file and get tokens for namenodes
// specified in mapreduce.job.hdfs-servers
SecurityHelper.populateTokenCache(globalConf, dag.getCredentials());
@@ -265,7 +283,7 @@ public class TezDagBuilder extends TezOp
if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) == null) {
// If tez setting is not defined
MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, mapTaskEnv, true);
- MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, true);
+ MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, false);
}
if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) != null) {
@@ -279,7 +297,7 @@ public class TezDagBuilder extends TezOp
try {
fs = FileSystem.get(globalConf);
- intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(fs, FileLocalizer.getTemporaryResourcePath(pc));
+ intermediateTaskInputSize = fs.getDefaultBlockSize(FileLocalizer.getTemporaryResourcePath(pc));
} catch (Exception e) {
log.warn("Unable to get the block size for temporary directory, defaulting to 128MB", e);
intermediateTaskInputSize = 134217728L;
@@ -397,7 +415,11 @@ public class TezDagBuilder extends TezOp
tezOp.getVertexGroupInfo().setVertexGroup(vertexGroup);
POStore store = tezOp.getVertexGroupInfo().getStore();
if (store != null) {
- vertexGroup.addDataSink(store.getOperatorKey().toString(),
+ String outputKey = store.getOperatorKey().toString();
+ if (store instanceof POStoreTez) {
+ outputKey = ((POStoreTez) store).getOutputKey();
+ }
+ vertexGroup.addDataSink(outputKey,
DataSinkDescriptor.create(tezOp.getVertexGroupInfo().getStoreOutputDescriptor(),
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), dag.getCredentials()));
}
@@ -441,7 +463,14 @@ public class TezDagBuilder extends TezOp
Configuration conf = new Configuration(pigContextConf);
- if (!combinePlan.isEmpty()) {
+ if (edge.needsDistinctCombiner()) {
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS,
+ MRCombiner.class.getName());
+ conf.set(MRJobConfig.COMBINE_CLASS_ATTR,
+ DistinctCombiner.Combine.class.getName());
+ log.info("Setting distinct combiner class between "
+ + from.getOperatorKey() + " and " + to.getOperatorKey());
+ } else if (!combinePlan.isEmpty()) {
udfContextSeparator.serializeUDFContextForEdge(conf, from, to, UDFType.USERFUNC);
addCombiner(combinePlan, to, conf, isMergedInput);
}
@@ -450,7 +479,7 @@ public class TezDagBuilder extends TezOp
POLocalRearrangeTez.class);
for (POLocalRearrangeTez lr : lrs) {
- if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+ if (lr.containsOutputKey(to.getOperatorKey().toString())) {
byte keyType = lr.getKeyType();
setIntermediateOutputKeyValue(keyType, conf, to, lr.isConnectedToPackage(), isMergedInput);
// In case of secondary key sort, main key type is the actual key type
@@ -479,7 +508,8 @@ public class TezDagBuilder extends TezOp
conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
- conf.set("pig.pigContext", serializedPigContext);
+ conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pc.getExecType().isLocal());
+ conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pc.getLog4jProperties()));
conf.set("udf.import.list", serializedUDFImportList);
if(to.isGlobalSort() || to.isLimitAfterSort()){
@@ -510,26 +540,36 @@ public class TezDagBuilder extends TezOp
UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
out.setUserPayload(payLoad);
+ in.setUserPayload(payLoad);
+ // Remove combiner and reset payload
if (!combinePlan.isEmpty()) {
boolean noCombineInReducer = false;
+ boolean noCombineInMapper = edge.getCombinerInMap() == null ? false : !edge.getCombinerInMap();
String reducerNoCombiner = globalConf.get(PigConfiguration.PIG_EXEC_NO_COMBINER_REDUCER);
- if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) {
+ if (edge.getCombinerInReducer() != null) {
+ noCombineInReducer = !edge.getCombinerInReducer();
+ } else if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) {
noCombineInReducer = TezCompilerUtil.bagDataTypeInCombinePlan(combinePlan);
} else {
noCombineInReducer = Boolean.parseBoolean(reducerNoCombiner);
}
- if (noCombineInReducer) {
+ if (noCombineInReducer || noCombineInMapper) {
log.info("Turning off combiner in reducer vertex " + to.getOperatorKey() + " for edge from " + from.getOperatorKey());
conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
conf.unset(MRJobConfig.COMBINE_CLASS_ATTR);
conf.unset("pig.combinePlan");
conf.unset("pig.combine.package");
conf.unset("pig.map.keytype");
- payLoad = TezUtils.createUserPayloadFromConf(conf);
+ UserPayload payLoadWithoutCombiner = TezUtils.createUserPayloadFromConf(conf);
+ if (noCombineInMapper) {
+ out.setUserPayload(payLoadWithoutCombiner);
+ }
+ if (noCombineInReducer) {
+ in.setUserPayload(payLoadWithoutCombiner);
+ }
}
}
- in.setUserPayload(payLoad);
if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
// Use custom edge
@@ -593,6 +633,8 @@ public class TezDagBuilder extends TezOp
setOutputFormat(job);
payloadConf.set("udf.import.list", serializedUDFImportList);
payloadConf.set("exectype", "TEZ");
+ payloadConf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pc.getExecType().isLocal());
+ payloadConf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pc.getLog4jProperties()));
// Process stores
LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
@@ -611,11 +653,7 @@ public class TezDagBuilder extends TezOp
payloadConf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
payloadConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
inputPayLoad = new Configuration(payloadConf);
- if (tezOp.getLoaderInfo().getLoads().get(0).getLoadFunc() instanceof DefaultIndexableLoader) {
- inputPayLoad.set("pig.pigContext", serializedPigContext);
- }
}
- payloadConf.set("pig.pigContext", serializedPigContext);
if (tezOp.getSampleOperator() != null) {
payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.getSampleOperator().getOperatorKey().toString());
@@ -689,7 +727,7 @@ public class TezDagBuilder extends TezOp
PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
for (POLocalRearrangeTez lr : lrs) {
if (lr.isConnectedToPackage()
- && lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
+ && lr.containsOutputKey(tezOp.getOperatorKey().toString())) {
localRearrangeMap.put((int) lr.getIndex(), inputKey);
if (isVertexGroup) {
isMergedInput = true;
@@ -772,9 +810,25 @@ public class TezDagBuilder extends TezOp
String vmPluginName = null;
Configuration vmPluginConf = null;
+ boolean containScatterGather = false;
+ boolean containCustomPartitioner = false;
+ for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
+ if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) {
+ containScatterGather = true;
+ }
+ if (edge.partitionerClass != null) {
+ containCustomPartitioner = true;
+ }
+ }
+
+ if(containScatterGather) {
+ vmPluginName = ShuffleVertexManager.class.getName();
+ vmPluginConf = new Configuration(shuffleVertexManagerBaseConf);
+ }
// Set the right VertexManagerPlugin
if (tezOp.getEstimatedParallelism() != -1) {
+ boolean autoParallelism = false;
if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
if (tezOp.getVertexParallelism()==-1 && (
tezOp.isGlobalSort() &&getPlan().getPredecessors(tezOp).size()==1||
@@ -783,33 +837,12 @@ public class TezDagBuilder extends TezOp
// to decrease/increase parallelism of sorting vertex dynamically
// based on the numQuantiles calculated by sample aggregation vertex
vmPluginName = PartitionerDefinedVertexManager.class.getName();
+ autoParallelism = true;
log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
}
} else {
- boolean containScatterGather = false;
- boolean containCustomPartitioner = false;
- for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
- if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) {
- containScatterGather = true;
- }
- if (edge.partitionerClass!=null) {
- containCustomPartitioner = true;
- }
- }
if (containScatterGather && !containCustomPartitioner) {
- vmPluginConf = (vmPluginConf == null) ? new Configuration(pigContextConf) : vmPluginConf;
- // Use auto-parallelism feature of ShuffleVertexManager to dynamically
- // reduce the parallelism of the vertex
- if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
- && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()) {
- vmPluginName = PigGraceShuffleVertexManager.class.getName();
- tezOp.setUseGraceParallelism(true);
- vmPluginConf.set("pig.tez.plan", getSerializedTezPlan());
- vmPluginConf.set("pig.pigContext", serializedPigContext);
- } else {
- vmPluginName = ShuffleVertexManager.class.getName();
- }
- vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
+
// For Intermediate reduce, set the bytes per reducer to be block size.
long bytesPerReducer = intermediateTaskInputSize;
// If there are store statements, use BYTES_PER_REDUCER_PARAM configured by user.
@@ -818,8 +851,8 @@ public class TezDagBuilder extends TezOp
// In Tez, numReducers=(map output size/bytesPerReducer) we need lower values to avoid skews in reduce
// as map input sizes are mostly always high compared to map output.
if (stores.size() > 0) {
- if (vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
- bytesPerReducer = vmPluginConf.getLong(
+ if (pigContextConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
+ bytesPerReducer = pigContextConf.getLong(
InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
} else if (tezOp.isGroupBy()) {
@@ -828,10 +861,28 @@ public class TezDagBuilder extends TezOp
bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_DEFAULT;
}
}
+
+ // Use auto-parallelism feature of ShuffleVertexManager to dynamically
+ // reduce the parallelism of the vertex. Use PigGraceShuffleVertexManager
+ // instead of ShuffleVertexManager if pig.tez.grace.parallelism is turned on
+ if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
+ && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()
+ && tezOp.getCrossKeys() == null) {
+ vmPluginName = PigGraceShuffleVertexManager.class.getName();
+ tezOp.setUseGraceParallelism(true);
+ vmPluginConf.set("pig.tez.plan", getSerializedTezPlan());
+ vmPluginConf.set(PigImplConstants.PIG_CONTEXT, serializedPigContext);
+ vmPluginConf.setLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, bytesPerReducer);
+ }
+ vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, bytesPerReducer);
+ autoParallelism = true;
log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString());
}
}
+ if (globalConf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY, false) && autoParallelism) {
+ disableDAGRecovery = true;
+ }
}
if (tezOp.isLimit() && (vmPluginName == null || vmPluginName.equals(PigGraceShuffleVertexManager.class.getName())||
vmPluginName.equals(ShuffleVertexManager.class.getName()))) {
@@ -1409,22 +1460,12 @@ public class TezDagBuilder extends TezOp
private void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
// the OutputFormat we report to Hadoop is always PigOutputFormat which
- // can be wrapped with LazyOutputFormat provided if it is supported by
- // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
+ // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set
if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
- try {
- Class<?> clazz = PigContext
- .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
- Method method = clazz.getMethod("setOutputFormatClass",
- org.apache.hadoop.mapreduce.Job.class, Class.class);
- method.invoke(null, job, PigOutputFormatTez.class);
- } catch (Exception e) {
- job.setOutputFormatClass(PigOutputFormatTez.class);
- log.warn(PigConfiguration.PIG_OUTPUT_LAZY
- + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
- }
+ LazyOutputFormat.setOutputFormatClass(job,PigOutputFormatTez.class);
} else {
job.setOutputFormatClass(PigOutputFormatTez.class);
}
}
+
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Wed Feb 22 09:43:41 2017
@@ -30,6 +30,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
import org.apache.tez.client.TezClient;
@@ -51,7 +56,7 @@ import com.google.common.collect.Maps;
*/
public class TezJob implements Runnable {
private static final Log log = LogFactory.getLog(TezJob.class);
- private Configuration conf;
+ private TezConfiguration conf;
private EnumSet<StatusGetOpts> statusGetOpts;
private Map<String, LocalResource> requestAMResources;
private ApplicationId appId;
@@ -69,31 +74,71 @@ public class TezJob implements Runnable
public TezJob(TezConfiguration conf, DAG dag,
Map<String, LocalResource> requestAMResources,
- int estimatedTotalParallelism) throws IOException {
+ TezOperPlan tezPlan) throws IOException {
this.conf = conf;
this.dag = dag;
this.requestAMResources = requestAMResources;
this.reuseSession = conf.getBoolean(PigConfiguration.PIG_TEZ_SESSION_REUSE, true);
this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
- tezJobConf = new TezJobConfig(estimatedTotalParallelism);
+ tezJobConf = new TezJobConfig(tezPlan);
}
static class TezJobConfig {
private int estimatedTotalParallelism = -1;
+ private int maxOutputsinSingleVertex;
+ private int totalVertices = 0;
- public TezJobConfig(int estimatedTotalParallelism) {
- this.estimatedTotalParallelism = estimatedTotalParallelism;
+ public TezJobConfig(TezOperPlan tezPlan) throws VisitorException {
+ this.estimatedTotalParallelism = tezPlan.getEstimatedTotalParallelism();
+ MaxOutputsFinder finder = new MaxOutputsFinder(tezPlan);
+ finder.visit();
+ this.maxOutputsinSingleVertex = finder.getMaxOutputsinSingleVertex();
+ this.totalVertices = finder.getTotalVertices();
}
public int getEstimatedTotalParallelism() {
return estimatedTotalParallelism;
}
- public void setEstimatedTotalParallelism(int estimatedTotalParallelism) {
- this.estimatedTotalParallelism = estimatedTotalParallelism;
+ public int getMaxOutputsinSingleVertex() {
+ return maxOutputsinSingleVertex;
}
+ public int getTotalVertices() {
+ return totalVertices;
+ }
+
+ }
+
+ private static class MaxOutputsFinder extends TezOpPlanVisitor {
+
+ private int maxOutputsinSingleVertex = 1;
+ private int totalVertices = 0;
+
+ public MaxOutputsFinder(TezOperPlan plan) {
+ super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+ }
+
+ public int getMaxOutputsinSingleVertex() {
+ return maxOutputsinSingleVertex;
+ }
+
+ public int getTotalVertices() {
+ return totalVertices;
+ }
+
+ @Override
+ public void visitTezOp(TezOperator tezOperator) throws VisitorException {
+ if (!tezOperator.isVertexGroup()) {
+ totalVertices++;
+ int outputs = tezOperator.outEdges.keySet().size();
+ maxOutputsinSingleVertex = maxOutputsinSingleVertex > outputs ? maxOutputsinSingleVertex : outputs;
+ }
+ }
+
+
+
}
public DAG getDAG() {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Wed Feb 22 09:43:41 2017
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@@ -30,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.PigATSClient;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
@@ -50,11 +52,12 @@ public class TezJobCompiler {
private static final Log log = LogFactory.getLog(TezJobCompiler.class);
private PigContext pigContext;
- private TezConfiguration tezConf;
+ private Configuration conf;
+ private boolean disableDAGRecovery;
public TezJobCompiler(PigContext pigContext, Configuration conf) throws IOException {
this.pigContext = pigContext;
- this.tezConf = new TezConfiguration(conf);
+ this.conf = conf;
}
public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String, LocalResource> localResources)
@@ -64,6 +67,7 @@ public class TezJobCompiler {
TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlanNode.getTezOperPlan(), tezDag, localResources);
dagBuilder.visit();
dagBuilder.avoidContainerReuseIfInputSplitInDisk();
+ disableDAGRecovery = dagBuilder.shouldDisableDAGRecovery();
return tezDag;
}
@@ -85,6 +89,7 @@ public class TezJobCompiler {
return job;
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
private TezJob getJob(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer)
throws JobCreationException {
try {
@@ -107,8 +112,34 @@ public class TezJobCompiler {
}
DAG tezDag = buildDAG(tezPlanNode, localResources);
tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript()));
+ // set Tez caller context
+ // Reflection for the following code since it is only available since tez 0.8.1:
+ // CallerContext context = CallerContext.create(ATSService.CallerContext, ATSService.getPigAuditId(pigContext),
+ // ATSService.EntityType, "");
+ // tezDag.setCallerContext(context);
+ Class callerContextClass = null;
+ try {
+ callerContextClass = Class.forName("org.apache.tez.client.CallerContext");
+ } catch (ClassNotFoundException e) {
+ // If pre-Tez 0.8.1, skip setting CallerContext
+ }
+ if (callerContextClass != null) {
+ Method builderBuildMethod = callerContextClass.getMethod("create", String.class,
+ String.class, String.class, String.class);
+ Object context = builderBuildMethod.invoke(null, PigATSClient.CALLER_CONTEXT,
+ PigATSClient.getPigAuditId(pigContext), PigATSClient.ENTITY_TYPE, "");
+ Method dagSetCallerContext = tezDag.getClass().getMethod("setCallerContext",
+ context.getClass());
+ dagSetCallerContext.invoke(tezDag, context);
+ }
log.info("Total estimated parallelism is " + tezPlan.getEstimatedTotalParallelism());
- return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism());
+ TezConfiguration tezConf = new TezConfiguration(conf);
+ if (disableDAGRecovery
+ && tezConf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
+ TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
+ tezConf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
+ }
+ return new TezJob(tezConf, tezDag, localResources, tezPlan);
} catch (Exception e) {
int errCode = 2017;
String msg = "Internal error creating job configuration.";