You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 03:34:40 UTC
svn commit: r1784224 [6/17] - in /pig/branches/spark: ./ bin/ conf/
contrib/piggybank/java/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Fri Feb 24 03:34:37 2017
@@ -32,12 +32,10 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.hash.Hash;
import org.apache.pig.CollectableLoadFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.IndexableLoadFunc;
@@ -46,10 +44,8 @@ import org.apache.pig.OrderedLoadFunc;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -86,10 +82,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBloomFilterRearrangeTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterStatsTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez;
@@ -117,7 +110,6 @@ import org.apache.pig.impl.builtin.GetMe
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.NullableIntWritable;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.Operator;
@@ -175,10 +167,6 @@ public class TezCompiler extends PhyPlan
private Map<PhysicalOperator, TezOperator> phyToTezOpMap;
- // Contains the inputs to operator like join, with the list maintaining the
- // same order of join from left to right
- private Map<TezOperator, List<TezOperator>> inputsMap;
-
public static final String USER_COMPARATOR_MARKER = "user.comparator.func:";
public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold";
public static final String OPTIMISTIC_FILE_CONCATENATION = "pig.optimistic.files.concatenation";
@@ -187,8 +175,6 @@ public class TezCompiler extends PhyPlan
private boolean optimisticFileConcatenation = false;
private List<String> readOnceLoadFuncs = null;
- private Configuration conf;
-
private POLocalRearrangeTezFactory localRearrangeFactory;
public TezCompiler(PhysicalPlan plan, PigContext pigContext)
@@ -198,7 +184,6 @@ public class TezCompiler extends PhyPlan
this.pigContext = pigContext;
pigProperties = pigContext.getProperties();
- conf = ConfigurationUtil.toConfiguration(pigProperties, false);
splitsSeen = Maps.newHashMap();
tezPlan = new TezOperPlan();
nig = NodeIdGenerator.getGenerator();
@@ -212,7 +197,6 @@ public class TezCompiler extends PhyPlan
scope = roots.get(0).getOperatorKey().getScope();
localRearrangeFactory = new POLocalRearrangeTezFactory(scope, nig);
phyToTezOpMap = Maps.newHashMap();
- inputsMap = Maps.newHashMap();
fileConcatenationThreshold = Integer.parseInt(pigProperties
.getProperty(FILE_CONCATENATION_THRESHOLD, "100"));
@@ -671,8 +655,15 @@ public class TezCompiler extends PhyPlan
blocking();
TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp);
- TezEdgeDescriptor edge = curTezOp.inEdges.get(lastOp.getOperatorKey());
- edge.setNeedsDistinctCombiner(true);
+ // Add the DISTINCT plan as the combine plan. In MR Pig, the combiner is implemented
+ // with a global variable and a specific DistinctCombiner class. This seems better.
+ PhysicalPlan combinePlan = curTezOp.inEdges.get(lastOp.getOperatorKey()).combinePlan;
+ addDistinctPlan(combinePlan, 1);
+
+ POLocalRearrangeTez clr = localRearrangeFactory.create();
+ clr.setOutputKey(curTezOp.getOperatorKey().toString());
+ clr.setDistinct(true);
+ combinePlan.addAsLeaf(clr);
curTezOp.markDistinct();
addDistinctPlan(curTezOp.plan, op.getRequestedParallelism());
@@ -865,7 +856,6 @@ public class TezCompiler extends PhyPlan
} else {
curTezOp.plan.addAsLeaf(op);
}
- phyToTezOpMap.put(op, curTezOp);
} catch (Exception e) {
int errCode = 2034;
@@ -910,7 +900,6 @@ public class TezCompiler extends PhyPlan
public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException {
try {
blocking();
- inputsMap.put(curTezOp, new ArrayList<>(Arrays.asList(compiledInputs)));
TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp);
curTezOp.setRequestedParallelism(op.getRequestedParallelism());
if (op.isCross()) {
@@ -1099,7 +1088,7 @@ public class TezCompiler extends PhyPlan
indexerTezOp.setDontEstimateParallelism(true);
POStore st = TezCompilerUtil.getStore(scope, nig);
- FileSpec strFile = getTempFileSpec(pigContext);
+ FileSpec strFile = getTempFileSpec();
st.setSFile(strFile);
indexAggrOper.plan.addAsLeaf(st);
indexAggrOper.setClosed(true);
@@ -1266,7 +1255,7 @@ public class TezCompiler extends PhyPlan
rightTezOprAggr.setDontEstimateParallelism(true);
POStore st = TezCompilerUtil.getStore(scope, nig);
- FileSpec strFile = getTempFileSpec(pigContext);
+ FileSpec strFile = getTempFileSpec();
st.setSFile(strFile);
rightTezOprAggr.plan.addAsLeaf(st);
rightTezOprAggr.setClosed(true);
@@ -1357,9 +1346,6 @@ public class TezCompiler extends PhyPlan
} else if (op.getNumInps() > 1) {
curTezOp.markCogroup();
}
- } else if (op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) {
- curTezOp.markRegularJoin();
- addBloomToJoin(op, curTezOp);
}
} catch (Exception e) {
int errCode = 2034;
@@ -1368,132 +1354,6 @@ public class TezCompiler extends PhyPlan
}
}
- private void addBloomToJoin(POPackage op, TezOperator curTezOp) throws PlanException {
-
- List<TezOperator> inputs = inputsMap.get(curTezOp);
- TezOperator buildBloomOp;
- List<TezOperator> applyBloomOps = new ArrayList<>();
-
- String strategy = conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, POBuildBloomRearrangeTez.DEFAULT_BLOOM_STRATEGY);
- boolean createBloomInMap = "map".equals(strategy);
- if (!createBloomInMap && !strategy.equals("reduce")) {
- throw new PlanException(new IllegalArgumentException(
- "Invalid value for "
- + PigConfiguration.PIG_BLOOMJOIN_STRATEGY + " - "
- + strategy + ". Valid values are map and reduce"));
- }
- int numHash = conf.getInt(PigConfiguration.PIG_BLOOMJOIN_HASH_FUNCTIONS, POBuildBloomRearrangeTez.DEFAULT_NUM_BLOOM_HASH_FUNCTIONS);
- int vectorSizeBytes = conf.getInt(PigConfiguration.PIG_BLOOMJOIN_VECTORSIZE_BYTES, POBuildBloomRearrangeTez.DEFAULT_BLOOM_VECTOR_SIZE_BYTES);
- int numBloomFilters = POBuildBloomRearrangeTez.getNumBloomFilters(conf);
- int hashType = Hash.parseHashType(conf.get(PigConfiguration.PIG_BLOOMJOIN_HASH_TYPE, POBuildBloomRearrangeTez.DEFAULT_BLOOM_HASH_TYPE));
-
- // We build bloom of the right most input and apply the bloom filter on the left inputs by default.
- // But in case of left outer join we build bloom of the left input and use it on the right input
- boolean[] inner = op.getPkgr().getInner();
- boolean skipNullKeys = true;
- if (inner[inner.length - 1]) { // inner has from right to left while inputs has from left to right
- buildBloomOp = inputs.get(inputs.size() - 1); // Bloom filter is built from right most input
- for (int i = 0; i < (inner.length - 1); i++) {
- applyBloomOps.add(inputs.get(i));
- }
- skipNullKeys = inner[0];
- } else {
- // Left outer join
- skipNullKeys = false;
- buildBloomOp = inputs.get(0); // Bloom filter is built from left most input
- for (int i = 1; i < inner.length; i++) {
- applyBloomOps.add(inputs.get(i));
- }
- }
-
- // Add BuildBloom operator to the input
- POLocalRearrangeTez lr = (POLocalRearrangeTez) buildBloomOp.plan.getLeaves().get(0);
- POBuildBloomRearrangeTez bbr = new POBuildBloomRearrangeTez(lr, createBloomInMap, numBloomFilters, vectorSizeBytes, numHash, hashType);
- bbr.setSkipNullKeys(skipNullKeys);
- buildBloomOp.plan.remove(lr);
- buildBloomOp.plan.addAsLeaf(bbr);
-
- // Add a new reduce vertex that will construct the final bloom filter
- // - by combining the bloom filters from the buildBloomOp input tasks in the map strategy
- // - or directly from the keys from the buildBloomOp input tasks in the reduce strategy
- TezOperator combineBloomOp = getTezOp();
- tezPlan.add(combineBloomOp);
- combineBloomOp.markBuildBloom();
- // Explicitly set the parallelism for the new vertex to number of bloom filters.
- // Auto parallelism will bring it down based on the actual output size
- combineBloomOp.setEstimatedParallelism(numBloomFilters);
- // We don't want parallelism to be changed during the run by grace auto parallelism
- // It will take the whole input size and estimate way higher
- combineBloomOp.setDontEstimateParallelism(true);
-
- String combineBloomOpKey = combineBloomOp.getOperatorKey().toString();
- TezEdgeDescriptor edge = new TezEdgeDescriptor();
- TezCompilerUtil.connect(tezPlan, buildBloomOp, combineBloomOp, edge);
- bbr.setBloomOutputKey(combineBloomOpKey);
-
-
- POPackage pkg = new POPackage(OperatorKey.genOpKey(scope));
- pkg.setNumInps(1);
- BloomPackager pkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);;
- pkgr.setKeyType(DataType.INTEGER);
- pkg.setPkgr(pkgr);
- POValueOutputTez combineBloomOutput = new POValueOutputTez(OperatorKey.genOpKey(scope));
- combineBloomOp.plan.addAsLeaf(pkg);
- combineBloomOp.plan.addAsLeaf(combineBloomOutput);
-
- edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
- edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
-
- // Add combiner as well.
- POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
- BloomPackager combinerPkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);
- combinerPkgr.setCombiner(true);
- combinerPkgr.setKeyType(DataType.INTEGER);
- pkg_c.setPkgr(combinerPkgr);
- pkg_c.setNumInps(1);
- edge.combinePlan.addAsLeaf(pkg_c);
- POProject prjKey = new POProject(OperatorKey.genOpKey(scope));
- prjKey.setResultType(DataType.INTEGER);
- List<PhysicalPlan> clrInps = new ArrayList<PhysicalPlan>();
- PhysicalPlan pp = new PhysicalPlan();
- pp.add(prjKey);
- clrInps.add(pp);
- POLocalRearrangeTez clr = localRearrangeFactory.create(0, LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER);
- clr.setOutputKey(combineBloomOpKey);
- edge.combinePlan.addAsLeaf(clr);
-
- if (createBloomInMap) {
- // No combiner needed on map as there will be only one bloom filter per map for each partition
- // In the reducer, the bloom filters will be combined with same logic of reduce in BloomPackager
- edge.setCombinerInMap(false);
- edge.setCombinerInReducer(true);
- } else {
- pkgr.setBloomKeyType(op.getPkgr().getKeyType());
- // Do distinct of the keys on the map side to reduce data sent to reducers.
- // In case of reduce, not adding a combiner and doing the distinct during reduce itself.
- // If needed one can be added later
- edge.setCombinerInMap(true);
- edge.setCombinerInReducer(false);
- }
-
- // Broadcast the final bloom filter to other inputs
- for (TezOperator applyBloomOp : applyBloomOps) {
- applyBloomOp.markFilterBloom();
- lr = (POLocalRearrangeTez) applyBloomOp.plan.getLeaves().get(0);
- POBloomFilterRearrangeTez bfr = new POBloomFilterRearrangeTez(lr, numBloomFilters);
- applyBloomOp.plan.remove(lr);
- applyBloomOp.plan.addAsLeaf(bfr);
- bfr.setInputKey(combineBloomOpKey);
- edge = new TezEdgeDescriptor();
- edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
- edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
- TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
- TezCompilerUtil.connect(tezPlan, combineBloomOp, applyBloomOp, edge);
- combineBloomOutput.addOutputKey(applyBloomOp.getOperatorKey().toString());
- }
-
- }
-
@Override
public void visitPOForEach(POForEach op) throws VisitorException{
try{
@@ -1653,7 +1513,7 @@ public class TezCompiler extends PhyPlan
for (int i=0; i<transformPlans.size(); i++) {
eps1.add(transformPlans.get(i));
- flat1.add(i == transformPlans.size() - 1 ? true : false);
+ flat1.add(true);
}
// This foreach will pick the sort key columns from the POPoissonSample output
@@ -1862,7 +1722,7 @@ public class TezCompiler extends PhyPlan
* @return
* @throws IOException
*/
- public static FileSpec getTempFileSpec(PigContext pigContext) throws IOException {
+ private FileSpec getTempFileSpec() throws IOException {
return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java Fri Feb 24 03:34:37 2017
@@ -31,13 +31,8 @@ import org.apache.tez.runtime.library.ou
* Descriptor for Tez edge. It holds combine plan as well as edge properties.
*/
public class TezEdgeDescriptor implements Serializable {
-
- public transient PhysicalPlan combinePlan;
- private boolean needsDistinctCombiner;
- // Combiner runs on both input and output of Tez edge by default
- // It can be configured to run only in output(map) or input(reduce)
- private Boolean combinerInMap;
- private Boolean combinerInReducer;
+ // Combiner runs on both input and output of Tez edge.
+ transient public PhysicalPlan combinePlan;
public String inputClassName;
public String outputClassName;
@@ -70,30 +65,6 @@ public class TezEdgeDescriptor implement
dataMovementType = DataMovementType.SCATTER_GATHER;
}
- public boolean needsDistinctCombiner() {
- return needsDistinctCombiner;
- }
-
- public void setNeedsDistinctCombiner(boolean nic) {
- needsDistinctCombiner = nic;
- }
-
- public Boolean getCombinerInMap() {
- return combinerInMap;
- }
-
- public void setCombinerInMap(Boolean combinerInMap) {
- this.combinerInMap = combinerInMap;
- }
-
- public Boolean getCombinerInReducer() {
- return combinerInReducer;
- }
-
- public void setCombinerInReducer(Boolean combinerInReducer) {
- this.combinerInReducer = combinerInReducer;
- }
-
public boolean isUseSecondaryKey() {
return useSecondaryKey;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java Fri Feb 24 03:34:37 2017
@@ -25,9 +25,8 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedHashSet;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -218,12 +217,8 @@ public class TezOperPlan extends Operato
newPlan.add(node);
}
- // Using a LinkedHashSet and doing a sort so that
- // test plan printed remains same between jdk7 and jdk8
- Set<Pair<TezOperator, TezOperator>> toReconnect = new LinkedHashSet<Pair<TezOperator, TezOperator>>();
- List<TezOperator> fromEdges = new ArrayList<>(mFromEdges.keySet());
- Collections.sort(fromEdges);
- for (TezOperator from : fromEdges) {
+ Set<Pair<TezOperator, TezOperator>> toReconnect = new HashSet<Pair<TezOperator, TezOperator>>();
+ for (TezOperator from : mFromEdges.keySet()) {
List<TezOperator> tos = mFromEdges.get(from);
for (TezOperator to : tos) {
if (list.contains(from) || list.contains(to)) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Fri Feb 24 03:34:37 2017
@@ -181,11 +181,7 @@ public class TezOperator extends Operato
// Indicate if this job is a native job
NATIVE,
// Indicate if this job does rank counter
- RANK_COUNTER,
- // Indicate if this job constructs bloom filter
- BUILDBLOOM,
- // Indicate if this job applies bloom filter
- FILTERBLOOM;
+ RANK_COUNTER;
};
// Features in the job/vertex. Mostly will be only one feature.
@@ -239,7 +235,6 @@ public class TezOperator extends Operato
}
private LoaderInfo loaderInfo = new LoaderInfo();
- private long totalInputFilesSize = -1;
public TezOperator(OperatorKey k) {
super(k);
@@ -457,22 +452,6 @@ public class TezOperator extends Operato
feature.set(OPER_FEATURE.RANK_COUNTER.ordinal());
}
- public boolean isBuildBloom() {
- return feature.get(OPER_FEATURE.BUILDBLOOM.ordinal());
- }
-
- public void markBuildBloom() {
- feature.set(OPER_FEATURE.BUILDBLOOM.ordinal());
- }
-
- public boolean isFilterBloom() {
- return feature.get(OPER_FEATURE.FILTERBLOOM.ordinal());
- }
-
- public void markFilterBloom() {
- feature.set(OPER_FEATURE.FILTERBLOOM.ordinal());
- }
-
public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE> excludeFeatures) {
for (OPER_FEATURE opf : OPER_FEATURE.values()) {
if (excludeFeatures != null && excludeFeatures.contains(opf)) {
@@ -672,14 +651,6 @@ public class TezOperator extends Operato
return loaderInfo;
}
- public long getTotalInputFilesSize() {
- return totalInputFilesSize;
- }
-
- public void setTotalInputFilesSize(long totalInputFilesSize) {
- this.totalInputFilesSize = totalInputFilesSize;
- }
-
public void setUseGraceParallelism(boolean useGraceParallelism) {
this.useGraceParallelism = useGraceParallelism;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java Fri Feb 24 03:34:37 2017
@@ -31,7 +31,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -162,7 +161,7 @@ public class TezPOPackageAnnotator exten
@Override
public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
POLocalRearrangeTez lr = (POLocalRearrangeTez) lrearrange;
- if (!(lr.isConnectedToPackage() && lr.containsOutputKey(pkgTezOp.getOperatorKey().toString()))) {
+ if (!(lr.isConnectedToPackage() && lr.getOutputKey().equals(pkgTezOp.getOperatorKey().toString()))) {
return;
}
loRearrangeFound++;
@@ -181,9 +180,7 @@ public class TezPOPackageAnnotator exten
if(keyInfo == null)
keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
- // For BloomPackager there is only one input, but the
- // POBuildBloomRearrangeTez index is that of the join's index and can be non-zero
- Integer index = (pkg.getPkgr() instanceof BloomPackager) ? 0 : Integer.valueOf(lrearrange.getIndex());
+ Integer index = Integer.valueOf(lrearrange.getIndex());
if(keyInfo.get(index) != null) {
if (isPOSplit) {
// Case of POSplit having more than one input in case of self join or union
@@ -200,20 +197,12 @@ public class TezPOPackageAnnotator exten
}
- if (pkg.getPkgr() instanceof BloomPackager ) {
- keyInfo.put(index,
- new Pair<Boolean, Map<Integer, Integer>>(
- Boolean.FALSE, new HashMap<Integer, Integer>()));
- pkg.getPkgr().setKeyInfo(keyInfo);
- } else {
- keyInfo.put(index,
- new Pair<Boolean, Map<Integer, Integer>>(
- lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
- pkg.getPkgr().setKeyInfo(keyInfo);
- pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
- pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
- }
-
+ keyInfo.put(index,
+ new Pair<Boolean, Map<Integer, Integer>>(
+ lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
+ pkg.getPkgr().setKeyInfo(keyInfo);
+ pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+ pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
}
/**
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java Fri Feb 24 03:34:37 2017
@@ -29,14 +29,9 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.DependencyOrderWalker;
-import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.PlanException;
@@ -165,178 +160,100 @@ public class TezPlanContainer extends Op
return;
}
- List<TezOperator> opersToSegment = null;
+ TezOperator operToSegment = null;
+ List<TezOperator> succs = new ArrayList<TezOperator>();
try {
// Split top down from root to leaves
- // Get list of operators closer to the root that can be segmented together
- FirstLevelSegmentOperatorsFinder finder = new FirstLevelSegmentOperatorsFinder(tezOperPlan);
+ SegmentOperatorFinder finder = new SegmentOperatorFinder(tezOperPlan);
finder.visit();
- opersToSegment = finder.getOperatorsToSegment();
+ operToSegment = finder.getOperatorToSegment();
} catch (VisitorException e) {
throw new PlanException(e);
}
- if (!opersToSegment.isEmpty()) {
- Set<TezOperator> commonSplitterPredecessors = new HashSet<>();
- for (TezOperator operToSegment : opersToSegment) {
- for (TezOperator succ : tezOperPlan.getSuccessors(operToSegment)) {
- commonSplitterPredecessors
- .addAll(getCommonSplitterPredecessors(tezOperPlan,
- operToSegment, succ));
- }
- }
- if (commonSplitterPredecessors.isEmpty()) {
- List<TezOperator> allSuccs = new ArrayList<TezOperator>();
- // Disconnect all the successors and move them to a new plan
- for (TezOperator operToSegment : opersToSegment) {
- List<TezOperator> succs = new ArrayList<TezOperator>();
- succs.addAll(tezOperPlan.getSuccessors(operToSegment));
- allSuccs.addAll(succs);
- for (TezOperator succ : succs) {
- tezOperPlan.disconnect(operToSegment, succ);
+ if (operToSegment != null && tezOperPlan.getSuccessors(operToSegment) != null) {
+ succs.addAll(tezOperPlan.getSuccessors(operToSegment));
+ for (TezOperator succ : succs) {
+ tezOperPlan.disconnect(operToSegment, succ);
+ }
+ for (TezOperator succ : succs) {
+ try {
+ if (tezOperPlan.getOperator(succ.getOperatorKey()) == null) {
+ // Has already been moved to a new plan by previous successor
+ // as part of dependency. It could have been further split.
+ // So walk the full plan to find the new plan and connect
+ TezOperatorFinder finder = new TezOperatorFinder(this, succ);
+ finder.visit();
+ connect(planNode, finder.getPlanContainerNode());
+ continue;
}
- }
- TezOperPlan newOperPlan = new TezOperPlan();
- for (TezOperator succ : allSuccs) {
+ TezOperPlan newOperPlan = new TezOperPlan();
tezOperPlan.moveTree(succ, newOperPlan);
- }
- TezPlanContainerNode newPlanNode = new TezPlanContainerNode(
- generateNodeOperatorKey(), newOperPlan);
- add(newPlanNode);
- connect(planNode, newPlanNode);
- split(newPlanNode);
- } else {
- // If there is a common splitter predecessor between operToSegment and the successor,
- // we have to separate out that split to be able to segment.
- // So we store the output of split to a temp store and then change the
- // splittees to load from it.
- String scope = opersToSegment.get(0).getOperatorKey().getScope();
- for (TezOperator splitter : commonSplitterPredecessors) {
- try {
- List<TezOperator> succs = new ArrayList<TezOperator>();
- succs.addAll(tezOperPlan.getSuccessors(splitter));
- FileSpec fileSpec = TezCompiler.getTempFileSpec(pigContext);
- POStore tmpStore = getTmpStore(scope, fileSpec);
- // Replace POValueOutputTez with POStore
- splitter.plan.remove(splitter.plan.getLeaves().get(0));
- splitter.plan.addAsLeaf(tmpStore);
- splitter.segmentBelow = true;
- splitter.setSplitter(false);
- for (TezOperator succ : succs) {
- // Replace POValueInputTez with POLoad
- POLoad tmpLoad = getTmpLoad(scope, fileSpec);
- succ.plan.replace(succ.plan.getRoots().get(0), tmpLoad);
- }
- } catch (Exception e) {
- throw new PlanException(e);
+ TezPlanContainerNode newPlanNode = new TezPlanContainerNode(
+ generateNodeOperatorKey(), newOperPlan);
+ add(newPlanNode);
+ connect(planNode, newPlanNode);
+ split(newPlanNode);
+ if (newPlanNode.getTezOperPlan().getOperator(succ.getOperatorKey()) == null) {
+ // On further split, the successor moved to a new plan container.
+ // Connect to that
+ TezOperatorFinder finder = new TezOperatorFinder(this, succ);
+ finder.visit();
+ disconnect(planNode, newPlanNode);
+ connect(planNode, finder.getPlanContainerNode());
}
+ } catch (VisitorException e) {
+ throw new PlanException(e);
}
}
split(planNode);
}
}
- private static class FirstLevelSegmentOperatorsFinder extends TezOpPlanVisitor {
+ private static class SegmentOperatorFinder extends TezOpPlanVisitor {
- private List<TezOperator> opersToSegment = new ArrayList<>();
+ private TezOperator operToSegment;
- public FirstLevelSegmentOperatorsFinder(TezOperPlan plan) {
+ public SegmentOperatorFinder(TezOperPlan plan) {
super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
}
- public List<TezOperator> getOperatorsToSegment() {
- return opersToSegment;
+ public TezOperator getOperatorToSegment() {
+ return operToSegment;
}
@Override
- public void visitTezOp(TezOperator tezOp) throws VisitorException {
- if (tezOp.needSegmentBelow() && getPlan().getSuccessors(tezOp) != null) {
- if (opersToSegment.isEmpty()) {
- opersToSegment.add(tezOp);
- } else {
- // If the operator does not have dependency on previous
- // operators chosen for segmenting then add it to the
- // operators to be segmented together
- if (!hasPredecessor(tezOp, opersToSegment)) {
- opersToSegment.add(tezOp);
- }
- }
+ public void visitTezOp(TezOperator tezOperator) throws VisitorException {
+ if (tezOperator.needSegmentBelow() && operToSegment == null) {
+ operToSegment = tezOperator;
}
}
- /**
- * Check if the tezOp has one of the opsToCheck as a predecessor.
- * It can be a immediate predecessor or multiple levels up.
- */
- private boolean hasPredecessor(TezOperator tezOp, List<TezOperator> opsToCheck) {
- List<TezOperator> predecessors = getPlan().getPredecessors(tezOp);
- if (predecessors != null) {
- for (TezOperator pred : predecessors) {
- if (opersToSegment.contains(pred)) {
- return true;
- } else {
- if (hasPredecessor(pred, opsToCheck)) {
- return true;
- }
- }
- }
- }
- return false;
- }
-
}
- private Set<TezOperator> getCommonSplitterPredecessors(TezOperPlan plan, TezOperator operToSegment, TezOperator successor) {
- Set<TezOperator> splitters1 = new HashSet<>();
- Set<TezOperator> splitters2 = new HashSet<>();
- Set<TezOperator> processedPredecessors = new HashSet<>();
- // Find predecessors which are splitters
- fetchSplitterPredecessors(plan, operToSegment, processedPredecessors, splitters1);
- if (!splitters1.isEmpty()) {
- // For the successor, traverse rest of the plan below it and
- // search the predecessors of its successors to find any predecessor that might be a splitter.
- Set<TezOperator> allSuccs = new HashSet<>();
- getAllSuccessors(plan, successor, allSuccs);
- processedPredecessors.clear();
- processedPredecessors.add(successor);
- for (TezOperator succ : allSuccs) {
- fetchSplitterPredecessors(plan, succ, processedPredecessors, splitters2);
- }
- // Find the common ones
- splitters1.retainAll(splitters2);
+ private static class TezOperatorFinder extends TezPlanContainerVisitor {
+
+ private TezPlanContainerNode planContainerNode;
+ private TezOperator operatorToFind;
+
+ public TezOperatorFinder(TezPlanContainer plan, TezOperator operatorToFind) {
+ super(plan, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(plan));
+ this.operatorToFind = operatorToFind;
}
- return splitters1;
- }
- private void fetchSplitterPredecessors(TezOperPlan plan, TezOperator tezOp,
- Set<TezOperator> processedPredecessors, Set<TezOperator> splitters) {
- List<TezOperator> predecessors = plan.getPredecessors(tezOp);
- if (predecessors != null) {
- for (TezOperator pred : predecessors) {
- // Skip processing already processed predecessor to avoid loops
- if (processedPredecessors.contains(pred)) {
- continue;
- }
- if (pred.isSplitter()) {
- splitters.add(pred);
- } else if (!pred.needSegmentBelow()) {
- processedPredecessors.add(pred);
- fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters);
- }
- }
+ public TezPlanContainerNode getPlanContainerNode() {
+ return planContainerNode;
}
- }
- private void getAllSuccessors(TezOperPlan plan, TezOperator tezOp, Set<TezOperator> allSuccs) {
- List<TezOperator> successors = plan.getSuccessors(tezOp);
- if (successors != null) {
- for (TezOperator succ : successors) {
- if (!allSuccs.contains(succ)) {
- allSuccs.add(succ);
- getAllSuccessors(plan, succ, allSuccs);
- }
+ @Override
+ public void visitTezPlanContainerNode(
+ TezPlanContainerNode tezPlanContainerNode)
+ throws VisitorException {
+ if (tezPlanContainerNode.getTezOperPlan().getOperatorKey(operatorToFind) != null) {
+ planContainerNode = tezPlanContainerNode;
}
}
+
}
private synchronized OperatorKey generateNodeOperatorKey() {
@@ -350,21 +267,6 @@ public class TezPlanContainer extends Op
scopeId = 0;
}
- private POLoad getTmpLoad(String scope, FileSpec fileSpec){
- POLoad ld = new POLoad(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)));
- ld.setPc(pigContext);
- ld.setIsTmpLoad(true);
- ld.setLFile(fileSpec);
- return ld;
- }
-
- private POStore getTmpStore(String scope, FileSpec fileSpec){
- POStore st = new POStore(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)));
- st.setIsTmpStore(true);
- st.setSFile(fileSpec);
- return new POStoreTez(st);
- }
-
@Override
public String toString() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java Fri Feb 24 03:34:37 2017
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
@@ -81,9 +80,6 @@ public class TezPrinter extends TezOpPla
printer.setVerbose(isVerbose);
printer.visit();
mStream.println();
- } else if (edgeDesc.needsDistinctCombiner()) {
- mStream.println("# Combine plan on edge <" + inEdge + ">");
- mStream.println(DistinctCombiner.Combine.class.getName());
}
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java Fri Feb 24 03:34:37 2017
@@ -56,7 +56,6 @@ public class POCounterStatsTez extends P
private transient KeyValuesReader reader;
private transient KeyValueWriter writer;
private transient boolean finished = false;
- private transient boolean hasNext = false;
public POCounterStatsTez(OperatorKey k) {
super(k);
@@ -89,7 +88,6 @@ public class POCounterStatsTez extends P
try {
reader = (KeyValuesReader) input.getReader();
LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
- hasNext = reader.next();
} catch (Exception e) {
throw new ExecException(e);
}
@@ -132,13 +130,12 @@ public class POCounterStatsTez extends P
Integer key = null;
Long value = null;
// Read count of records per task
- while (hasNext) {
+ while (reader.next()) {
key = ((IntWritable)reader.getCurrentKey()).get();
for (Object val : reader.getCurrentValues()) {
value = ((LongWritable)val).get();
counterRecords.put(key, value);
}
- hasNext = reader.next();
}
// BinInterSedes only takes String for map key
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java Fri Feb 24 03:34:37 2017
@@ -19,8 +19,6 @@
package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -103,13 +101,9 @@ public class POFRJoinTez extends POFRJoi
LogicalInput input = inputs.get(key);
if (!this.replInputs.contains(input)) {
this.replInputs.add(input);
- KeyValueReader reader = (KeyValueReader) input.getReader();
- this.replReaders.add(reader);
- log.info("Attached input from vertex " + key + " : input=" + input + ", reader=" + reader);
+ this.replReaders.add((KeyValueReader) input.getReader());
}
}
- // Do not force fetch input by reading first record. Cases like MultiQuery_Union_4 have
- // multiple POFRJoinTez loading same replicate input and will skip records
} catch (Exception e) {
throw new ExecException(e);
}
@@ -120,7 +114,6 @@ public class POFRJoinTez extends POFRJoi
*
* @throws ExecException
*/
- @SuppressWarnings("unchecked")
@Override
protected void setUpHashMap() throws ExecException {
@@ -128,8 +121,8 @@ public class POFRJoinTez extends POFRJoi
// where same POFRJoinTez occurs in different Split sub-plans
Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
if (cacheValue != null) {
- replicates = (List<Map<? extends Object, ? extends List<Tuple>>>) cacheValue;
- log.info("Found " + (replicates.size() - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey);
+ replicates = (TupleToMapKey[]) cacheValue;
+ log.info("Found " + (replicates.length - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey);
return;
}
@@ -155,7 +148,7 @@ public class POFRJoinTez extends POFRJoi
long time1 = System.currentTimeMillis();
- replicates.set(fragment, null);
+ replicates[fragment] = null;
int inputIdx = 0;
// We need to adjust the index because the number of replInputs is
// one less than the number of inputSchemas. The inputSchemas
@@ -165,12 +158,7 @@ public class POFRJoinTez extends POFRJoi
SchemaTupleFactory inputSchemaTupleFactory = inputSchemaTupleFactories[schemaIdx];
SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[schemaIdx];
- Map<Object, ArrayList<Tuple>> replicate;
- if (keySchemaTupleFactory == null) {
- replicate = new HashMap<Object, ArrayList<Tuple>>(4000);
- } else {
- replicate = new TupleToMapKey(4000, keySchemaTupleFactory);
- }
+ TupleToMapKey replicate = new TupleToMapKey(4000, keySchemaTupleFactory);
POLocalRearrange lr = LRs[schemaIdx];
try {
@@ -180,8 +168,7 @@ public class POFRJoinTez extends POFRJoi
}
PigNullableWritable key = (PigNullableWritable) replReaders.get(inputIdx).getCurrentKey();
- Object keyValue = key.getValueAsPigType();
- if (isKeyNull(keyValue)) continue;
+ if (isKeyNull(key.getValueAsPigType())) continue;
NullableTuple val = (NullableTuple) replReaders.get(inputIdx).getCurrentValue();
// POFRJoin#getValueTuple() is reused to construct valTuple,
@@ -189,31 +176,27 @@ public class POFRJoinTez extends POFRJoi
// construct one here.
Tuple retTuple = mTupleFactory.newTuple(3);
retTuple.set(0, key.getIndex());
- retTuple.set(1, keyValue);
+ retTuple.set(1, key.getValueAsPigType());
retTuple.set(2, val.getValueAsPigType());
Tuple valTuple = getValueTuple(lr, retTuple);
- ArrayList<Tuple> values = replicate.get(keyValue);
- if (values == null) {
- if (inputSchemaTupleFactory == null) {
- values = new ArrayList<Tuple>(1);
- } else {
- values = new TuplesToSchemaTupleList(1, inputSchemaTupleFactory);
- }
- replicate.put(keyValue, values);
+ Tuple keyTuple = mTupleFactory.newTuple(1);
+ keyTuple.set(0, key.getValueAsPigType());
+ if (replicate.get(keyTuple) == null) {
+ replicate.put(keyTuple, new TuplesToSchemaTupleList(1, inputSchemaTupleFactory));
}
- values.add(valTuple);
+ replicate.get(keyTuple).add(valTuple);
}
} catch (IOException e) {
throw new ExecException(e);
}
- replicates.set(schemaIdx, replicate);
+ replicates[schemaIdx] = replicate;
inputIdx++;
schemaIdx++;
}
long time2 = System.currentTimeMillis();
- log.info((replicates.size() - 1) + " replication hash tables built. Time taken: " + (time2 - time1));
+ log.info((replicates.length - 1) + " replication hash tables built. Time taken: " + (time2 - time1));
ObjectCache.getInstance().cache(cacheKey, replicates);
log.info("Cached replicate hash tables in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java Fri Feb 24 03:34:37 2017
@@ -57,7 +57,6 @@ public class POIdentityInOutTez extends
private transient KeyValuesReader shuffleReader;
private transient boolean shuffleInput;
private transient boolean finished = false;
- private transient boolean hasNext = false;
public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange, String inputKey) {
super(inputRearrange);
@@ -96,12 +95,9 @@ public class POIdentityInOutTez extends
Reader r = input.getReader();
if (r instanceof KeyValueReader) {
reader = (KeyValueReader) r;
- // Force input fetch
- hasNext = reader.next();
} else {
shuffleInput = true;
shuffleReader = (KeyValuesReader) r;
- hasNext = shuffleReader.next();
}
LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + r);
} catch (Exception e) {
@@ -131,7 +127,7 @@ public class POIdentityInOutTez extends
return RESULT_EOP;
}
if (shuffleInput) {
- while (hasNext) {
+ while (shuffleReader.next()) {
Object curKey = shuffleReader.getCurrentKey();
Iterable<Object> vals = shuffleReader.getCurrentValues();
if (isSkewedJoin) {
@@ -143,10 +139,9 @@ public class POIdentityInOutTez extends
for (Object val : vals) {
writer.write(curKey, val);
}
- hasNext = shuffleReader.next();
}
} else {
- while (hasNext) {
+ while (reader.next()) {
if (isSkewedJoin) {
NullablePartitionWritable wrappedKey = new NullablePartitionWritable(
(PigNullableWritable) reader.getCurrentKey());
@@ -160,7 +155,6 @@ public class POIdentityInOutTez extends
writer.write(reader.getCurrentKey(),
reader.getCurrentValue());
}
- hasNext = reader.next();
}
}
finished = true;
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java Fri Feb 24 03:34:37 2017
@@ -71,8 +71,8 @@ public class POLocalRearrangeTez extends
}
}
- public boolean containsOutputKey(String key) {
- return outputKey.equals(key);
+ public String getOutputKey() {
+ return outputKey;
}
public void setOutputKey(String outputKey) {
@@ -122,10 +122,6 @@ public class POLocalRearrangeTez extends
}
}
- protected Result getRearrangedTuple() throws ExecException {
- return super.getNextTuple();
- }
-
@Override
public Result getNextTuple() throws ExecException {
res = super.getNextTuple();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java Fri Feb 24 03:34:37 2017
@@ -51,7 +51,6 @@ public class PORankTez extends PORank im
private transient Map<Integer, Long> counterOffsets;
private transient Configuration conf;
private transient boolean finished = false;
- private transient Boolean hasFirstRecord;
public PORankTez(PORank copy) {
super(copy);
@@ -101,7 +100,6 @@ public class PORankTez extends PORank im
try {
reader = (KeyValueReader) input.getReader();
LOG.info("Attached input from vertex " + tuplesInputKey + " : input=" + input + ", reader=" + reader);
- hasFirstRecord = reader.next();
} catch (Exception e) {
throw new ExecException(e);
}
@@ -142,18 +140,9 @@ public class PORankTez extends PORank im
Result inp = null;
try {
- if (hasFirstRecord != null) {
- if (hasFirstRecord) {
- hasFirstRecord = null;
- inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
- return addRank(inp);
- }
- hasFirstRecord = null;
- } else {
- while (reader.next()) {
- inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
- return addRank(inp);
- }
+ while (reader.next()) {
+ inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
+ return addRank(inp);
}
} catch (IOException e) {
throw new ExecException(e);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java Fri Feb 24 03:34:37 2017
@@ -25,8 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparator;
import org.apache.pig.backend.executionengine.ExecException;
@@ -34,16 +32,12 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
import org.apache.pig.data.AccumulativeBag;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalCachedBag;
-import org.apache.pig.data.ReadOnceBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
@@ -54,7 +48,6 @@ import org.apache.tez.runtime.library.co
public class POShuffleTezLoad extends POPackage implements TezInput {
private static final long serialVersionUID = 1L;
- private static final Log LOG = LogFactory.getLog(POShuffleTezLoad.class);
protected List<String> inputKeys = new ArrayList<String>();
private boolean isSkewedJoin = false;
@@ -68,7 +61,6 @@ public class POShuffleTezLoad extends PO
private transient WritableComparator groupingComparator = null;
private transient Configuration conf;
private transient int accumulativeBatchSize;
- private transient boolean readOnceOneBag;
public POShuffleTezLoad(POPackage pack) {
super(pack);
@@ -109,10 +101,7 @@ public class POShuffleTezLoad extends PO
// - Input key will be repeated, but index would be same within a TezInput
if (!this.inputs.contains(input)) {
this.inputs.add(input);
- KeyValuesReader reader = (KeyValuesReader)input.getReader();
- this.readers.add(reader);
- LOG.info("Attached input from vertex " + inputKey
- + " : input=" + input + ", reader=" + reader);
+ this.readers.add((KeyValuesReader)input.getReader());
}
}
@@ -128,13 +117,6 @@ public class POShuffleTezLoad extends PO
for (int i = 0; i < numTezInputs; i++) {
finished[i] = !readers.get(i).next();
}
-
- this.readOnceOneBag = (numInputs == 1)
- && (pkgr instanceof CombinerPackager
- || pkgr instanceof LitePackager || pkgr instanceof BloomPackager);
- if (readOnceOneBag) {
- readOnce[0] = true;
- }
} catch (Exception e) {
throw new ExecException(e);
}
@@ -205,47 +187,43 @@ public class POShuffleTezLoad extends PO
} else {
- if (readOnceOneBag) {
- bags[0] = new TezReadOnceBag(pkgr, min);
- } else {
- for (int i = 0; i < numInputs; i++) {
- bags[i] = new InternalCachedBag(numInputs);
- }
+ for (int i = 0; i < numInputs; i++) {
+ bags[i] = new InternalCachedBag(numInputs);
+ }
- if (numTezInputs == 1) {
- do {
- Iterable<Object> vals = readers.get(0).getCurrentValues();
- for (Object val : vals) {
- NullableTuple nTup = (NullableTuple) val;
- int index = nTup.getIndex();
- Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
- bags[index].add(tup);
- }
- finished[0] = !readers.get(0).next();
- if (finished[0]) {
- break;
- }
- cur = readers.get(0).getCurrentKey();
- } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators
- } else {
- for (int i = 0; i < numTezInputs; i++) {
- if (!finished[i]) {
- cur = readers.get(i).getCurrentKey();
- // We need to loop in case of Grouping Comparators
- while (groupingComparator.compare(min, cur) == 0) {
- Iterable<Object> vals = readers.get(i).getCurrentValues();
- for (Object val : vals) {
- NullableTuple nTup = (NullableTuple) val;
- int index = nTup.getIndex();
- Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
- bags[index].add(tup);
- }
- finished[i] = !readers.get(i).next();
- if (finished[i]) {
- break;
- }
- cur = readers.get(i).getCurrentKey();
+ if (numTezInputs == 1) {
+ do {
+ Iterable<Object> vals = readers.get(0).getCurrentValues();
+ for (Object val : vals) {
+ NullableTuple nTup = (NullableTuple) val;
+ int index = nTup.getIndex();
+ Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+ bags[index].add(tup);
+ }
+ finished[0] = !readers.get(0).next();
+ if (finished[0]) {
+ break;
+ }
+ cur = readers.get(0).getCurrentKey();
+ } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators
+ } else {
+ for (int i = 0; i < numTezInputs; i++) {
+ if (!finished[i]) {
+ cur = readers.get(i).getCurrentKey();
+ // We need to loop in case of Grouping Comparators
+ while (groupingComparator.compare(min, cur) == 0) {
+ Iterable<Object> vals = readers.get(i).getCurrentValues();
+ for (Object val : vals) {
+ NullableTuple nTup = (NullableTuple) val;
+ int index = nTup.getIndex();
+ Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+ bags[index].add(tup);
+ }
+ finished[i] = !readers.get(i).next();
+ if (finished[i]) {
+ break;
}
+ cur = readers.get(i).getCurrentKey();
}
}
}
@@ -405,74 +383,4 @@ public class POShuffleTezLoad extends PO
}
- private class TezReadOnceBag extends ReadOnceBag {
-
- private static final long serialVersionUID = 1L;
- private Iterator<Object> iter;
-
- public TezReadOnceBag(Packager pkgr,
- PigNullableWritable currentKey) throws IOException {
- this.pkgr = pkgr;
- this.keyWritable = currentKey;
- this.iter = readers.get(0).getCurrentValues().iterator();
- }
-
- @Override
- public Iterator<Tuple> iterator() {
- return new TezReadOnceBagIterator();
- }
-
- private class TezReadOnceBagIterator implements Iterator<Tuple> {
-
- @Override
- public boolean hasNext() {
- if (iter.hasNext()) {
- return true;
- } else {
- try {
- finished[0] = !readers.get(0).next();
- if (finished[0]) {
- return false;
- }
- // Currently combiner is not being applied when secondary key(grouping comparator) is used
- // But might change in future. So check if the next key is same and return its values
- Object cur = readers.get(0).getCurrentKey();
- if (groupingComparator.compare(keyWritable, cur) == 0) {
- iter = readers.get(0).getCurrentValues().iterator();
- // Key should at least have one value. But doing a check just for safety
- if (iter.hasNext()) {
- return true;
- } else {
- throw new RuntimeException("Unexpected. Key " + keyWritable + " does not have any values");
- }
- }
- return false;
- } catch (IOException e) {
- throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e);
- }
- }
- }
-
- @Override
- public Tuple next() {
- NullableTuple ntup = (NullableTuple) iter.next();
- int index = ntup.getIndex();
- Tuple ret = null;
- try {
- ret = pkgr.getValueTuple(keyWritable, ntup, index);
- } catch (ExecException e) {
- throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e);
- }
- return ret;
- }
-
- @Override
- public void remove() {
- throw new RuntimeException("ReadOnceBag.iterator().remove() is not allowed");
- }
- }
-
- }
-
-
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java Fri Feb 24 03:34:37 2017
@@ -57,7 +57,6 @@ public class POShuffledValueInputTez ext
private transient Iterator<KeyValueReader> readers;
private transient KeyValueReader currentReader;
private transient Configuration conf;
- private transient Boolean hasFirstRecord;
public POShuffledValueInputTez(OperatorKey k) {
super(k);
@@ -99,8 +98,6 @@ public class POShuffledValueInputTez ext
}
readers = readersList.iterator();
currentReader = readers.next();
- // Force input fetch
- hasFirstRecord = currentReader.next();
} catch (Exception e) {
throw new ExecException(e);
}
@@ -114,15 +111,7 @@ public class POShuffledValueInputTez ext
}
do {
- if (hasFirstRecord != null) {
- if (hasFirstRecord) {
- hasFirstRecord = null;
- Tuple origTuple = (Tuple) currentReader.getCurrentValue();
- Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
- return new Result(POStatus.STATUS_OK, copy);
- }
- hasFirstRecord = null;
- } else if (currentReader.next()) {
+ if (currentReader.next()) {
Tuple origTuple = (Tuple) currentReader.getCurrentValue();
Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
return new Result(POStatus.STATUS_OK, copy);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java Fri Feb 24 03:34:37 2017
@@ -60,8 +60,6 @@ public class POSimpleTezLoad extends POL
private transient Configuration conf;
private transient boolean finished = false;
private transient TezCounter inputRecordCounter;
- private transient boolean initialized;
- private transient boolean noTupleCopy;
public POSimpleTezLoad(OperatorKey k, LoadFunc loader) {
super(k, loader);
@@ -151,13 +149,7 @@ public class POSimpleTezLoad extends POL
} else {
Result res = new Result();
Tuple next = (Tuple) reader.getCurrentValue();
- if (!initialized) {
- noTupleCopy = mTupleFactory.newTuple(1).getClass().isInstance(next);
- initialized = true;
- }
- // Some Loaders return implementations of DefaultTuple instead of BinSedesTuple
- // In that case copy to BinSedesTuple
- res.result = noTupleCopy ? next : mTupleFactory.newTupleNoCopy(next.getAll());
+ res.result = next;
res.returnStatus = POStatus.STATUS_OK;
if (inputRecordCounter != null) {
inputRecordCounter.increment(1);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java Fri Feb 24 03:34:37 2017
@@ -102,19 +102,19 @@ public class POStoreTez extends POStore
throw new ExecException(e);
}
- // Even if there is a single hdfs output, we add multi store counter
- // Makes it easier for user to see records for a particular store from
- // the DAG counter
- CounterGroup multiStoreGroup = processorContext.getCounters()
- .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
- if (multiStoreGroup == null) {
- processorContext.getCounters().addGroup(
- MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
- MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
- }
- String name = MRPigStatsUtil.getMultiStoreCounterName(this);
- if (name != null) {
- outputRecordCounter = multiStoreGroup.addCounter(name, name, 0);
+ // Multiple outputs - can be another store or other outputs (shuffle, broadcast)
+ if (outputs.size() > 1) {
+ CounterGroup multiStoreGroup = processorContext.getCounters()
+ .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+ if (multiStoreGroup == null) {
+ processorContext.getCounters().addGroup(
+ MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
+ MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+ }
+ String name = MRPigStatsUtil.getMultiStoreCounterName(this);
+ if (name != null) {
+ outputRecordCounter = multiStoreGroup.addCounter(name, name, 0);
+ }
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java Fri Feb 24 03:34:37 2017
@@ -57,7 +57,6 @@ public class POValueInputTez extends Phy
private transient KeyValuesReader shuffleReader;
private transient boolean shuffleInput;
private transient boolean hasNext;
- private transient Boolean hasFirstRecord;
public POValueInputTez(OperatorKey k) {
super(k);
@@ -93,8 +92,6 @@ public class POValueInputTez extends Phy
Reader r = input.getReader();
if (r instanceof KeyValueReader) {
reader = (KeyValueReader) r;
- // Force input fetch
- hasFirstRecord = reader.next();
} else {
shuffleInput = true;
shuffleReader = (KeyValuesReader) r;
@@ -121,22 +118,10 @@ public class POValueInputTez extends Phy
}
hasNext = shuffleReader.next();
}
- } else {
- if (hasFirstRecord != null) {
- if (hasFirstRecord) {
- hasFirstRecord = null;
- Tuple origTuple = (Tuple) reader.getCurrentValue();
- Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
- return new Result(POStatus.STATUS_OK, copy);
- }
- hasFirstRecord = null;
- } else {
- while (reader.next()) {
- Tuple origTuple = (Tuple) reader.getCurrentValue();
- Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
- return new Result(POStatus.STATUS_OK, copy);
- }
- }
+ } else if (reader.next()) {
+ Tuple origTuple = (Tuple) reader.getCurrentValue();
+ Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+ return new Result(POStatus.STATUS_OK, copy);
}
finished = true;
// For certain operators (such as STREAM), we could still have some work
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java Fri Feb 24 03:34:37 2017
@@ -69,11 +69,6 @@ public class CombinerOptimizer extends T
}
for (TezOperator from : predecessors) {
- PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
- if (!combinePlan.isEmpty()) {
- // Cases like bloom join have combine plan already set
- continue;
- }
List<POLocalRearrangeTez> rearranges = PlanHelper.getPhysicalOperators(from.plan, POLocalRearrangeTez.class);
if (rearranges.isEmpty()) {
continue;
@@ -82,7 +77,7 @@ public class CombinerOptimizer extends T
POLocalRearrangeTez connectingLR = null;
PhysicalPlan rearrangePlan = from.plan;
for (POLocalRearrangeTez lr : rearranges) {
- if (lr.containsOutputKey(to.getOperatorKey().toString())) {
+ if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
connectingLR = lr;
break;
}
@@ -95,6 +90,7 @@ public class CombinerOptimizer extends T
// Detected the POLocalRearrange -> POPackage pattern. Let's add
// combiner if possible.
+ PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
CombinerOptimizerUtil.addCombiner(rearrangePlan, to.plan, combinePlan, messageCollector, doMapAgg);
if(!combinePlan.isEmpty()) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java Fri Feb 24 03:34:37 2017
@@ -30,7 +30,6 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
@@ -66,6 +65,11 @@ public class LoaderProcessor extends Tez
this.jobConf.setBoolean("mapred.mapper.new-api", true);
this.jobConf.setClass("mapreduce.inputformat.class",
PigInputFormat.class, InputFormat.class);
+ try {
+ this.jobConf.set("pig.pigContext", ObjectSerializer.serialize(pc));
+ } catch (IOException e) {
+ throw new VisitorException(e);
+ }
}
/**
@@ -171,7 +175,6 @@ public class LoaderProcessor extends Tez
// splits can be moved to if(loads) block below
int parallelism = tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks();
tezOp.setRequestedParallelism(parallelism);
- tezOp.setTotalInputFilesSize(InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job));
}
return lds;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java Fri Feb 24 03:34:37 2017
@@ -153,8 +153,6 @@ public class MultiQueryOptimizerTez exte
}
}
if (getPlan().getSuccessors(successor) != null) {
- nonPackageInputSuccessors.clear();
- toMergeSuccessors.clear();
for (TezOperator succSuccessor : getPlan().getSuccessors(successor)) {
if (succSuccessor.isUnion()) {
if (!(unionOptimizerOn &&
@@ -173,13 +171,7 @@ public class MultiQueryOptimizerTez exte
continue;
}
}
- if (TezCompilerUtil.isNonPackageInput(successor.getOperatorKey().toString(), succSuccessor)) {
- // Output goes to scalar or POFRJoinTez in the union operator
- // We need to ensure it is the only one to avoid parallel edges
- canMerge = canMerge ? nonPackageInputSuccessors.add(succSuccessor) : false;
- } else {
- toMergeSuccessors.add(succSuccessor);
- }
+ toMergeSuccessors.add(succSuccessor);
List<TezOperator> unionSuccessors = getPlan().getSuccessors(succSuccessor);
if (unionSuccessors != null) {
for (TezOperator unionSuccessor : unionSuccessors) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Fri Feb 24 03:34:37 2017
@@ -115,16 +115,11 @@ public class ParallelismSetter extends T
} else if (pc.defaultParallel != -1) {
parallelism = pc.defaultParallel;
}
- if (parallelism == 0) {
- // We need to produce empty output file.
- // Even if user set PARALLEL 0, mapreduce has 1 reducer
- parallelism = 1;
- }
boolean overrideRequestedParallelism = false;
if (parallelism != -1
&& autoParallelismEnabled
- && !tezOp.isDontEstimateParallelism()
&& tezOp.isIntermediateReducer()
+ && !tezOp.isDontEstimateParallelism()
&& tezOp.isOverrideIntermediateParallelism()) {
overrideRequestedParallelism = true;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java Fri Feb 24 03:34:37 2017
@@ -75,7 +75,7 @@ public class SecondaryKeyOptimizerTez ex
POLocalRearrangeTez connectingLR = null;
PhysicalPlan rearrangePlan = from.plan;
for (POLocalRearrangeTez lr : rearranges) {
- if (lr.containsOutputKey(to.getOperatorKey().toString())) {
+ if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
connectingLR = lr;
break;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java Fri Feb 24 03:34:37 2017
@@ -30,8 +30,6 @@ public class TezEstimatedParallelismClea
@Override
public void visitTezOp(TezOperator tezOp) throws VisitorException {
- if (!tezOp.isDontEstimateParallelism()) {
- tezOp.setEstimatedParallelism(-1);
- }
+ tezOp.setEstimatedParallelism(-1);
}
}