You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/02/17 09:26:52 UTC
svn commit: r1568901 - in /pig/branches/tez/src/org/apache/pig:
backend/hadoop/executionengine/physicalLayer/expressionOperators/
backend/hadoop/executionengine/physicalLayer/plans/
backend/hadoop/executionengine/physicalLayer/relationalOperators/ back...
Author: daijy
Date: Mon Feb 17 08:26:52 2014
New Revision: 1568901
URL: http://svn.apache.org/r1568901
Log:
PIG-3757: Make scalar work
Added:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Mon Feb 17 08:26:52 2014
@@ -538,6 +538,11 @@ public class POUserFunc extends Expressi
public FuncSpec getFuncSpec() {
return funcSpec;
}
+
+ public void setFuncSpec(FuncSpec funcSpec) {
+ this.funcSpec = funcSpec;
+ instantiateFunc(funcSpec);
+ }
public String[] getCacheFiles() {
return cacheFiles;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Mon Feb 17 08:26:52 2014
@@ -294,7 +294,12 @@ public class PhyPlanVisitor extends Plan
}
public void visitLimit(POLimit lim) throws VisitorException{
- //do nothing
+ PhysicalPlan inpPlan = lim.getLimitPlan();
+ if (inpPlan!=null) {
+ pushWalker(mCurrentWalker.spawnChildWalker(inpPlan));
+ visit();
+ popWalker();
+ }
}
public void visitCross(POCross cross) throws VisitorException{
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Mon Feb 17 08:26:52 2014
@@ -59,7 +59,7 @@ public class CombinerPackager extends Pa
super();
keyType = pkgr.keyType;
numInputs = 1;
- inner = new boolean[1];
+ inner = new boolean[pkgr.inner.length];
for (int i = 0; i < pkgr.inner.length; i++) {
inner[i] = true;
}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java?rev=1568901&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java Mon Feb 17 08:26:52 2014
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class MultiQueryOptimizerTez extends TezOpPlanVisitor {
+ public MultiQueryOptimizerTez(TezOperPlan plan) {
+ super(plan, new ReverseDependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+ }
+
+ @Override
+ public void visitTezOp(TezOperator tezOp) throws VisitorException {
+ try {
+ if (!tezOp.isSplitter()) {
+ return;
+ }
+
+ List<TezOperator> splittees = new ArrayList<TezOperator>();
+
+ List<TezOperator> successors = getPlan().getSuccessors(tezOp);
+ List<TezOperator> succ_successors = new ArrayList<TezOperator>();
+ for (TezOperator successor : successors) {
+ // don't want to be complicated by nested split
+ if (successor.isSplitter()) {
+ continue;
+ }
+ // If has other dependency, don't merge into split,
+ if (getPlan().getPredecessors(successor).size()!=1) {
+ continue;
+ }
+ boolean containsBlacklistedOp = false;
+ for (PhysicalOperator op : successor.plan) {
+ if (op instanceof POReservoirSample || op instanceof POPoissonSample) {
+ containsBlacklistedOp = true;
+ break;
+ }
+ }
+ if (containsBlacklistedOp) {
+ continue;
+ }
+ // Detect diamond shape, we cannot merge it into split, since Tez
+ // does not handle double edge between vertexes
+ boolean sharedSucc = false;
+ if (getPlan().getSuccessors(successor)!=null) {
+ for (TezOperator succ_successor : getPlan().getSuccessors(successor)) {
+ if (succ_successors.contains(succ_successor)) {
+ sharedSucc = true;
+ break;
+ }
+ }
+ succ_successors.addAll(getPlan().getSuccessors(successor));
+ }
+ if (sharedSucc) {
+ continue;
+ }
+ splittees.add(successor);
+ }
+
+ if (splittees.size()==0) {
+ return;
+ }
+
+ if (splittees.size()==1 && successors.size()==1) {
+ // We don't need a POSplit here, we can merge the splittee into spliter
+ PhysicalOperator firstNodeLeaf = tezOp.plan.getLeaves().get(0);
+ PhysicalOperator firstNodeLeafPred = tezOp.plan.getPredecessors(firstNodeLeaf).get(0);
+
+ TezOperator singleSplitee = splittees.get(0);
+ PhysicalOperator secondNodeRoot = singleSplitee.plan.getRoots().get(0);
+ PhysicalOperator secondNodeSucc = singleSplitee.plan.getSuccessors(secondNodeRoot).get(0);
+
+ tezOp.plan.remove(firstNodeLeaf);
+ singleSplitee.plan.remove(secondNodeRoot);
+
+ //TODO remove filter all
+
+ tezOp.plan.merge(singleSplitee.plan);
+ tezOp.plan.connect(firstNodeLeafPred, secondNodeSucc);
+
+ addSubPlanPropertiesToParent(tezOp, singleSplitee);
+
+ removeSplittee(getPlan(), tezOp, singleSplitee);
+ } else {
+ POValueOutputTez valueOutput = (POValueOutputTez)tezOp.plan.getLeaves().get(0);
+ POSplit split = new POSplit(OperatorKey.genOpKey(valueOutput.getOperatorKey().getScope()));
+ for (TezOperator splitee : splittees) {
+ PhysicalOperator spliteeRoot = splitee.plan.getRoots().get(0);
+ splitee.plan.remove(spliteeRoot);
+ split.addPlan(splitee.plan);
+
+ addSubPlanPropertiesToParent(tezOp, splitee);
+
+ removeSplittee(getPlan(), tezOp, splitee);
+ valueOutput.outputKeys.remove(splitee.getOperatorKey().toString());
+ }
+ if (!valueOutput.outputKeys.isEmpty()) {
+ // We still need valueOutput
+ PhysicalPlan phyPlan = new PhysicalPlan();
+ phyPlan.addAsLeaf(valueOutput);
+ split.addPlan(phyPlan);
+ }
+ PhysicalOperator pred = tezOp.plan.getPredecessors(valueOutput).get(0);
+ tezOp.plan.disconnect(pred, valueOutput);
+ tezOp.plan.remove(valueOutput);
+ tezOp.plan.add(split);
+ tezOp.plan.connect(pred, split);
+ }
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
+
+ static public void removeSplittee(TezOperPlan plan, TezOperator splitter, TezOperator splittee) throws PlanException {
+ if (plan.getSuccessors(splittee)!=null) {
+ List<TezOperator> succs = new ArrayList<TezOperator>();
+ succs.addAll(plan.getSuccessors(splittee));
+ plan.disconnect(splitter, splittee);
+ for (TezOperator succTezOperator : succs) {
+ TezEdgeDescriptor edge = succTezOperator.inEdges.get(splittee.getOperatorKey());
+
+ splitter.outEdges.remove(splittee.getOperatorKey());
+ succTezOperator.inEdges.remove(splittee.getOperatorKey());
+ plan.disconnect(splittee, succTezOperator);
+ TezCompilerUtil.connect(plan, splitter, succTezOperator, edge);
+ }
+ }
+ plan.remove(splittee);
+ }
+
+ static public void addSubPlanPropertiesToParent(TezOperator parentOper, TezOperator subPlanOper) {
+ if (subPlanOper.getRequestedParallelism() > parentOper.getRequestedParallelism()) {
+ parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism());
+ }
+ subPlanOper.setRequestedParallelismByReference(parentOper);
+ if (subPlanOper.UDFs != null) {
+ parentOper.UDFs.addAll(subPlanOper.UDFs);
+ }
+ if (subPlanOper.scalars != null) {
+ parentOper.scalars.addAll(subPlanOper.scalars);
+ }
+ if (subPlanOper.outEdges != null) {
+ for (Entry<OperatorKey, TezEdgeDescriptor> entry: subPlanOper.outEdges.entrySet()) {
+ parentOper.outEdges.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Mon Feb 17 08:26:52 2014
@@ -33,12 +33,14 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.data.BinSedesTuple;
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.ReadScalarsTez;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.tez.common.TezUtils;
@@ -158,6 +160,12 @@ public class PigProcessor implements Log
for (POValueInputTez input : valueInputs){
input.attachInputs(inputs, conf);
}
+ LinkedList<POUserFunc> scalarInputs = PlanHelper.getPhysicalOperators(execPlan, POUserFunc.class);
+ for (POUserFunc userFunc : scalarInputs ) {
+ if (userFunc.getFunc() instanceof ReadScalarsTez) {
+ ((ReadScalarsTez)userFunc.getFunc()).attachInputs(inputs, conf);
+ }
+ }
LinkedList<POFRJoinTez> broadcasts = PlanHelper.getPhysicalOperators(execPlan, POFRJoinTez.class);
for (POFRJoinTez broadcast : broadcasts){
broadcast.attachInputs(inputs, conf);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java Mon Feb 17 08:26:52 2014
@@ -65,6 +65,10 @@ public class SecondaryKeyOptimizerTez ex
break;
}
}
+
+ if (connectingLR == null) {
+ continue;
+ }
// Detected the POLocalRearrange -> POPackage pattern. Let's add
// combiner if possible.
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Mon Feb 17 08:26:52 2014
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -84,6 +85,7 @@ import org.apache.pig.impl.builtin.Defau
import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.builtin.GetMemNumRows;
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
+import org.apache.pig.impl.builtin.ReadScalarsTez;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -223,17 +225,6 @@ public class TezCompiler extends PhyPlan
for (PhysicalOperator op : ops) {
compile(op);
- if (curTezOp.isSplitSubPlan()) {
- // Set inputs to null as POSplit will attach input to roots
- for (PhysicalOperator root : curTezOp.plan.getRoots()) {
- root.setInputs(null);
- }
- TezOperator splitOp = splitsSeen.get(curTezOp.getSplitOperatorKey());
- POSplit split = findPOSplit(splitOp, curTezOp.getSplitOperatorKey());
- split.addPlan(curTezOp.plan);
- addSubPlanPropertiesToParent(splitOp, curTezOp);
- curTezOp = splitOp;
- }
}
for (TezOperator tezOper : splitsSeen.values()) {
@@ -245,39 +236,42 @@ public class TezCompiler extends PhyPlan
}
tezOper.setClosed(true);
}
-
- connectSoftLink();
+
+ fixScalar();
return tezPlan;
}
+
+ private void fixScalar() throws VisitorException, PlanException {
+ // Mapping POStore to POValueOuptut
+ Map<POStore, POValueOutputTez> storeSeen = new HashMap<POStore, POValueOutputTez>();
+
+ for (TezOperator tezOp : tezPlan) {
+ List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class);
+ for (POUserFunc userFunc : userFuncs) {
+ if (userFunc.getReferencedOperator()!=null) { // Scalar
+ POStore store = (POStore)userFunc.getReferencedOperator();
+
+ TezOperator from = phyToTezOpMap.get(store);
- private void addSubPlanPropertiesToParent(TezOperator parentOper, TezOperator subPlanOper) {
- if (subPlanOper.getRequestedParallelism() > parentOper.getRequestedParallelism()) {
- parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism());
- }
- subPlanOper.setRequestedParallelismByReference(parentOper);
- if (subPlanOper.UDFs != null) {
- parentOper.UDFs.addAll(subPlanOper.UDFs);
- }
- if (subPlanOper.outEdges != null) {
- for (Entry<OperatorKey, TezEdgeDescriptor> entry: subPlanOper.outEdges.entrySet()) {
- parentOper.outEdges.put(entry.getKey(), entry.getValue());
- }
- }
- }
+ FuncSpec newSpec = new FuncSpec(ReadScalarsTez.class.getName(), from.getOperatorKey().toString());
+ userFunc.setFuncSpec(newSpec);
- private void connectSoftLink() throws PlanException, IOException {
- for (PhysicalOperator op : plan) {
- if (plan.getSoftLinkPredecessors(op)!=null) {
- for (PhysicalOperator pred : plan.getSoftLinkPredecessors(op)) {
- TezOperator from = phyToTezOpMap.get(pred);
- TezOperator to = phyToTezOpMap.get(op);
- if (from==to) {
- continue;
- }
- if (tezPlan.getPredecessors(to)==null || !tezPlan.getPredecessors(to).contains(from)) {
- tezPlan.connect(from, to);
+ if (storeSeen.containsKey(store)) {
+ storeSeen.get(store).outputKeys.add(tezOp.getOperatorKey().toString());
+ } else {
+ POValueOutputTez output = new POValueOutputTez(OperatorKey.genOpKey(scope));
+ output.addOutputKey(tezOp.getOperatorKey().toString());
+ from.plan.remove(from.plan.getOperator(store.getOperatorKey()));
+ from.plan.addAsLeaf(output);
+ storeSeen.put(store, output);
}
+
+ TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, from, tezOp);
+ //TODO shared edge once support is available in Tez
+ edge.dataMovementType = DataMovementType.BROADCAST;
+ edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+ edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
}
}
}
@@ -314,33 +308,66 @@ public class TezCompiler extends PhyPlan
}
PhysicalOperator p = predecessors.get(0);
- TezOperator oper = null;
+ TezOperator storeTezOper = null;
if (p instanceof POStore) {
- oper = phyToTezOpMap.get(p);
+ storeTezOper = phyToTezOpMap.get(p);
} else {
int errCode = 2126;
- String msg = "Predecessor of load should be a store. Got "+p.getClass();
+ String msg = "Predecessor of load should be a store. Got " + p.getClass();
throw new PlanException(msg, errCode, PigException.BUG);
}
-
- // Need new operator
+ PhysicalOperator store = storeTezOper.plan.getOperator(p.getOperatorKey());
+ // replace POStore to POValueOutputTez, convert the tezOperator to splitter
+ storeTezOper.plan.disconnect(storeTezOper.plan.getPredecessors(store).get(0), store);
+ storeTezOper.plan.remove(store);
+ POValueOutputTez valueOutput = new POValueOutputTez(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ storeTezOper.plan.addAsLeaf(valueOutput);
+ storeTezOper.setSplitter(true);
+
+ // Create a splittee of store only
+ TezOperator storeOnlyTezOperator = getTezOp();
+ PhysicalPlan storeOnlyPhyPlan = new PhysicalPlan();
+ POValueInputTez valueInput = new POValueInputTez(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ valueInput.setInputKey(storeTezOper.getOperatorKey().toString());
+ storeOnlyPhyPlan.addAsLeaf(valueInput);
+ storeOnlyPhyPlan.addAsLeaf(store);
+ storeOnlyTezOperator.plan = storeOnlyPhyPlan;
+ tezPlan.add(storeOnlyTezOperator);
+ phyToTezOpMap.put(store, storeOnlyTezOperator);
+
+ // Create new operator as second splittee
curTezOp = getTezOp();
- curTezOp.plan.add(op);
+ POValueInputTez valueInput2 = new POValueInputTez(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ valueInput2.setInputKey(storeTezOper.getOperatorKey().toString());
+ curTezOp.plan.add(valueInput2);
tezPlan.add(curTezOp);
- plan.disconnect(op, p);
- oper.segmentBelow = true;
- tezPlan.connect(oper, curTezOp);
- phyToTezOpMap.put(op, curTezOp);
+ // Connect splitter to splittee
+ TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, storeTezOper, storeOnlyTezOperator);
+ edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+ edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+ edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+ storeOnlyTezOperator.setRequestedParallelismByReference(storeTezOper);
+
+ edge = TezCompilerUtil.connect(tezPlan, storeTezOper, curTezOp);
+ edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+ edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+ edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+ curTezOp.setRequestedParallelismByReference(storeTezOper);
+
return;
}
Collections.sort(predecessors);
- compiledInputs = new TezOperator[predecessors.size()];
- int i = -1;
- for (PhysicalOperator pred : predecessors) {
- compile(pred);
- compiledInputs[++i] = curTezOp;
+ if(op instanceof POSplit && splitsSeen.containsKey(op.getOperatorKey())){
+ // skip follow up POSplit
+ } else {
+ compiledInputs = new TezOperator[predecessors.size()];
+ int i = -1;
+ for (PhysicalOperator pred : predecessors) {
+ compile(pred);
+ compiledInputs[++i] = curTezOp;
+ }
}
} else {
// No predecessors. Mostly a load. But this is where we start. We
@@ -365,20 +392,6 @@ public class TezCompiler extends PhyPlan
compiledInputs = prevCompInp;
}
- /**
- * Start a new TezOperator whose plan will be the sub-plan of POSplit
- *
- * @param splitOperatorKey
- * OperatorKey of the POSplit for which the new plan is a sub-plan
- * @return the new TezOperator
- * @throws PlanException
- */
- private TezOperator startNew(OperatorKey splitOperatorKey) throws PlanException {
- TezOperator ret = getTezOp();
- ret.setSplitOperatorKey(splitOperatorKey);
- return ret;
- }
-
private void nonBlocking(PhysicalOperator op) throws PlanException, IOException {
TezOperator tezOp;
if (compiledInputs.length == 1) {
@@ -400,46 +413,11 @@ public class TezCompiler extends PhyPlan
tezPlan.add(newTezOp);
for (TezOperator tezOp : compiledInputs) {
tezOp.setClosed(true);
- handleSplitAndConnect(tezPlan, tezOp, newTezOp);
+ TezCompilerUtil.connect(tezPlan, tezOp, newTezOp);
}
curTezOp = newTezOp;
}
- private TezEdgeDescriptor handleSplitAndConnect(TezOperPlan tezPlan, TezOperator from, TezOperator to)
- throws PlanException {
- return handleSplitAndConnect(tezPlan, from, to, true);
- }
-
- private TezEdgeDescriptor handleSplitAndConnect(TezOperPlan tezPlan,
- TezOperator from, TezOperator to, boolean addToSplitPlan)
- throws PlanException {
- // Add edge descriptors from POLocalRearrange in POSplit
- // sub-plan to new operators
- PhysicalOperator leaf = from.plan.getLeaves().get(0);
- // It could be POStoreTez incase of sampling job in order by
- if (leaf instanceof POLocalRearrangeTez) {
- POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
- lr.setOutputKey(to.getOperatorKey().toString());
- }
- TezEdgeDescriptor edge = null;
- if (from.isSplitSubPlan()) {
- TezOperator splitOp = splitsSeen.get(from.getSplitOperatorKey());
- if (addToSplitPlan) {
- // Set inputs to null as POSplit will attach input to roots
- for (PhysicalOperator root : from.plan.getRoots()) {
- root.setInputs(null);
- }
- POSplit split = findPOSplit(splitOp, from.getSplitOperatorKey());
- split.addPlan(from.plan);
- addSubPlanPropertiesToParent(splitOp, curTezOp);
- }
- edge = TezCompilerUtil.connect(tezPlan, splitOp, to);
- } else {
- edge = TezCompilerUtil.connect(tezPlan, from, to);
- }
- return edge;
- }
-
private POSplit findPOSplit(TezOperator tezOp, OperatorKey splitKey)
throws PlanException {
POSplit split = (POSplit) tezOp.plan.getOperator(splitKey);
@@ -473,105 +451,6 @@ public class TezCompiler extends PhyPlan
}
/**
- * Remove the operator and the whole tree connected to that operator from
- * the plan. Only remove corresponding connected sub-plan if you encounter
- * another Split operator in the predecessor.
- *
- * @param op Operator to remove
- * @throws VisitorException
- */
- private void removeDupOpTreeOfSplit(TezOperPlan plan, TezOperator op, boolean isMultiQuery)
- throws VisitorException {
- Stack<TezOperator> stack = new Stack<TezOperator>();
- stack.push(op);
- while (!stack.isEmpty()) {
- op = stack.pop();
- List<TezOperator> predecessors = plan.getPredecessors(op);
- if (predecessors != null) {
- if (isMultiQuery) {
- for (TezOperator pred : predecessors) {
- if (!pred.isSplitOperator()) {
- stack.push(pred);
- } else {
- List<POSplit> splits = PlanHelper.getPhysicalOperators(
- pred.plan, POSplit.class);
- for (POSplit split : splits) {
- PhysicalPlan planToRemove = null;
- for (PhysicalPlan splitPlan : split.getPlans()) {
- PhysicalOperator phyOp = splitPlan
- .getLeaves().get(0);
- if (phyOp instanceof POLocalRearrangeTez) {
- POLocalRearrangeTez lr = (POLocalRearrangeTez) phyOp;
- if (lr.getOutputKey().equals(
- op.getOperatorKey().toString())) {
- planToRemove = splitPlan;
- break;
- }
- }
- }
- if (planToRemove != null) {
- split.getPlans().remove(planToRemove);
- break;
- }
- }
- }
- }
- } else {
- for (TezOperator pred : predecessors) {
- // Remove everything till we encounter another split
- if (!pred.isSplitOperator()) {
- stack.push(pred);
- } else {
- // If split operator, just remove from the output
- POValueOutputTez valueOut = (POValueOutputTez)pred.plan.getLeaves().get(0);
- valueOut.removeOutputKey(op.getOperatorKey().toString());
- //TODO Handle shared edge when available in Tez
- pred.outEdges.remove(op.getOperatorKey().toString());
- }
- }
- }
- }
- plan.remove(op);
- }
- }
-
- /**
- * In case of mulitple levels of split, after removing duplicate tree we need to reset
- * input of operators in the old tree as some of the inputs of the PhysicalOperator in
- * original tree will now be overwritten and referring to operators in
- * duplicate tree. For eg: POFilter inputs will refer to the duplicate tree's
- * POValueInputTez even though it is connected to a original split tree's POValueInputTez
- */
- private void resetInputsOfPredecessors(TezOperPlan plan, TezOperator op) {
- Stack<TezOperator> stack = new Stack<TezOperator>();
- stack.push(op);
- while (!stack.isEmpty()) {
- op = stack.pop();
- List<TezOperator> predecessors = plan.getPredecessors(op);
- if (predecessors != null) {
- for (TezOperator pred : predecessors) {
- resetInputs(pred.plan, pred.plan.getLeaves());
- if (!pred.isSplitOperator()) {
- stack.push(pred);
- }
- }
- }
- }
- }
-
- private void resetInputs(PhysicalPlan plan, List<PhysicalOperator> ops) {
- for (PhysicalOperator op : ops) {
- List<PhysicalOperator> preds = plan.getPredecessors(op);
- if (preds != null) {
- for (PhysicalOperator pred : preds) {
- pred.setInputs(plan.getPredecessors(pred));
- resetInputs(plan, plan.getPredecessors(pred));
- }
- }
- }
- }
-
- /**
* Merges the TezOperators in the compiledInputs into a single merged
* TezOperator.
*
@@ -702,7 +581,7 @@ public class TezCompiler extends PhyPlan
@Override
public void visitDistinct(PODistinct op) throws VisitorException {
try {
- POLocalRearrange lr = localRearrangeFactory.create();
+ POLocalRearrangeTez lr = localRearrangeFactory.create();
lr.setDistinct(true);
lr.setAlias(op.getAlias());
curTezOp.plan.addAsLeaf(lr);
@@ -751,23 +630,6 @@ public class TezCompiler extends PhyPlan
@Override
public void visitFilter(POFilter op) throws VisitorException {
try {
- if (curTezOp.isSplitSubPlan() || curTezOp.getSplitParent() != null) {
- // Do not add the filter. Refer NoopFilterRemover.java of MR
- PhysicalPlan filterPlan = op.getPlan();
- if (filterPlan.size() == 1) {
- PhysicalOperator fp = filterPlan.getRoots().get(0);
- if (fp instanceof ConstantExpression) {
- ConstantExpression exp = (ConstantExpression)fp;
- Object value = exp.getValue();
- if (value instanceof Boolean) {
- Boolean filterValue = (Boolean)value;
- if (filterValue) {
- return;
- }
- }
- }
- }
- }
nonBlocking(op);
processUDFs(op.getPlan());
phyToTezOpMap.put(op, curTezOp);
@@ -795,7 +657,7 @@ public class TezCompiler extends PhyPlan
lr.setOutputKey(curTezOp.getOperatorKey().toString());
tezOp.plan.addAsLeaf(lr);
- TezEdgeDescriptor edge = handleSplitAndConnect(tezPlan, tezOp, curTezOp);
+ TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, tezOp, curTezOp);
if (tezOp.getSplitOperatorKey() != null) {
inputKeys.add(tezOp.getSplitOperatorKey().toString());
} else {
@@ -1983,7 +1845,7 @@ public class TezCompiler extends PhyPlan
Pair<TezOperator, Integer> quantJobParallelismPair = getOrderbySamplingAggregationJob(op, rp);
TezOperator[] sortOpers = getSortJobs(prevOper, lr, op, keyType, fields);
- TezEdgeDescriptor edge = handleSplitAndConnect(tezPlan, prevOper, sortOpers[0]);
+ TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, prevOper, sortOpers[0]);
// Use 1-1 edge
edge.dataMovementType = DataMovementType.ONE_TO_ONE;
@@ -2003,7 +1865,7 @@ public class TezCompiler extends PhyPlan
sortOpers[1].setRequestedParallelism(quantJobParallelismPair.second);
*/
- handleSplitAndConnect(tezPlan, prevOper, quantJobParallelismPair.first, false);
+ TezCompilerUtil.connect(tezPlan, prevOper, quantJobParallelismPair.first);
lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString());
@@ -2036,66 +1898,30 @@ public class TezCompiler extends PhyPlan
@Override
public void visitSplit(POSplit op) throws VisitorException {
try {
- boolean isMultiQuery = "true".equalsIgnoreCase(pigContext
- .getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
-
- if (isMultiQuery) {
- if (splitsSeen.containsKey(op.getOperatorKey())) {
- // Since the plan for this split already exists in the tez plan,
- // discard the hierarchy or tez operators we constructed so far
- // till we encountered the split in this tree
- removeDupOpTreeOfSplit(tezPlan, curTezOp, isMultiQuery);
- curTezOp = startNew(op.getOperatorKey());
- } else {
- nonBlocking(op);
- if(curTezOp.isSplitSubPlan()) {
- // Split followed by another split
- // Set inputs to null as POSplit will attach input to roots
- for (PhysicalOperator root : curTezOp.plan.getRoots()) {
- root.setInputs(null);
- }
- TezOperator splitOp = splitsSeen.get(curTezOp.getSplitOperatorKey());
- POSplit split = findPOSplit(splitOp, curTezOp.getSplitOperatorKey());
- split.addPlan(curTezOp.plan);
- addSubPlanPropertiesToParent(splitOp, curTezOp);
- splitsSeen.put(op.getOperatorKey(), splitOp);
- phyToTezOpMap.put(op, splitOp);
- } else {
- curTezOp.setSplitOperator(true);
- splitsSeen.put(op.getOperatorKey(), curTezOp);
- phyToTezOpMap.put(op, curTezOp);
- }
- curTezOp = startNew(op.getOperatorKey());
- }
+ TezOperator splitOp = curTezOp;
+ POValueOutputTez output = null;
+ if (splitsSeen.containsKey(op.getOperatorKey())) {
+ splitOp = splitsSeen.get(op.getOperatorKey());
+ output = (POValueOutputTez)splitOp.plan.getLeaves().get(0);
} else {
- TezOperator splitOp = curTezOp;
- POValueOutputTez output = null;
- if (splitsSeen.containsKey(op.getOperatorKey())) {
- removeDupOpTreeOfSplit(tezPlan, curTezOp, isMultiQuery);
- splitOp = splitsSeen.get(op.getOperatorKey());
- resetInputsOfPredecessors(tezPlan, splitOp);
- output = (POValueOutputTez)splitOp.plan.getLeaves().get(0);
- } else {
- splitOp.setSplitOperator(true);
- splitsSeen.put(op.getOperatorKey(), splitOp);
- phyToTezOpMap.put(op, splitOp);
- output = new POValueOutputTez(OperatorKey.genOpKey(scope));
- splitOp.plan.addAsLeaf(output);
- }
- curTezOp = getTezOp();
- curTezOp.setSplitParent(splitOp.getOperatorKey());
- tezPlan.add(curTezOp);
- output.addOutputKey(curTezOp.getOperatorKey().toString());
- TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, splitOp, curTezOp);
- //TODO shared edge once support is available in Tez
- edge.dataMovementType = DataMovementType.ONE_TO_ONE;
- edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
- edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
- curTezOp.setRequestedParallelismByReference(splitOp);
- POValueInputTez input = new POValueInputTez(OperatorKey.genOpKey(scope));
- input.setInputKey(splitOp.getOperatorKey().toString());
- curTezOp.plan.addAsLeaf(input);
+ splitsSeen.put(op.getOperatorKey(), splitOp);
+ splitOp.setSplitter(true);
+ phyToTezOpMap.put(op, splitOp);
+ output = new POValueOutputTez(OperatorKey.genOpKey(scope));
+ splitOp.plan.addAsLeaf(output);
}
+ curTezOp = getTezOp();
+ tezPlan.add(curTezOp);
+ output.addOutputKey(curTezOp.getOperatorKey().toString());
+ TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, splitOp, curTezOp);
+ //TODO shared edge once support is available in Tez
+ edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+ edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+ edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+ curTezOp.setRequestedParallelismByReference(splitOp);
+ POValueInputTez input = new POValueInputTez(OperatorKey.genOpKey(scope));
+ input.setInputKey(splitOp.getOperatorKey().toString());
+ curTezOp.plan.addAsLeaf(input);
} catch (Exception e) {
int errCode = 2034;
String msg = "Error compiling operator "
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Mon Feb 17 08:26:52 2014
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -200,6 +201,24 @@ public class TezDagBuilder extends TezOp
break;
}
}
+
+ List<POValueOutputTez> valueOutputs = PlanHelper.getPhysicalOperators(from.plan,
+ POValueOutputTez.class);
+ if (!valueOutputs.isEmpty()) {
+ POValueOutputTez valueOutput = valueOutputs.get(0);
+ for (String outputKey : valueOutput.outputKeys) {
+ if (outputKey.equals(to.getOperatorKey().toString())) {
+ conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
+ POValueOutputTez.EmptyWritable.class.getName());
+ conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS,
+ BinSedesTuple.class.getName());
+ conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
+ POValueOutputTez.EmptyWritable.class.getName());
+ conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
+ BinSedesTuple.class.getName());
+ }
+ }
+ }
conf.setBoolean("mapred.mapper.new-api", true);
conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
@@ -240,17 +259,6 @@ public class TezDagBuilder extends TezOp
edge.partitionerClass.getName());
}
- if (from.plan.getLeaves().get(0) instanceof POValueOutputTez) {
- conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
- POValueOutputTez.EmptyWritable.class.getName());
- conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS,
- BinSedesTuple.class.getName());
- conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
- POValueOutputTez.EmptyWritable.class.getName());
- conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
- BinSedesTuple.class.getName());
- }
-
MRToTezHelper.convertMRToTezRuntimeConf(conf, globalConf);
in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
@@ -367,13 +375,22 @@ public class TezDagBuilder extends TezOp
// Set input keys for POShuffleTezLoad. This is used to identify
// the inputs that are attached to the POShuffleTezLoad in the
// backend.
+ Map<Integer, String> localRearrangeMap = new TreeMap<Integer, String>();
for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
if (tezOp.sampleOperator != null && tezOp.sampleOperator == pred) {
// skip sample vertex input
} else {
- newPack.addInputKey(pred.getOperatorKey().toString());
+ LinkedList<POLocalRearrangeTez> lrs = PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
+ for (POLocalRearrangeTez lr : lrs) {
+ if (lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
+ localRearrangeMap.put((int)lr.getIndex(), pred.getOperatorKey().toString());
+ }
+ }
}
}
+ for (Map.Entry<Integer, String> entry : localRearrangeMap.entrySet()) {
+ newPack.addInputKey(entry.getValue());
+ }
if (succsList != null) {
for (PhysicalOperator succs : succsList) {
@@ -383,8 +400,22 @@ public class TezDagBuilder extends TezOp
setIntermediateInputKeyValue(pack.getPkgr().getKeyType(), payloadConf,
tezOp);
+ } else if (roots.size() == 1 && roots.get(0) instanceof POIdentityInOutTez) {
+ POIdentityInOutTez identityInOut = (POIdentityInOutTez) roots.get(0);
+ // TODO Need to fix multiple input key mapping
+ TezOperator identityInOutPred = null;
+ for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
+ if (!pred.isSampler()) {
+ identityInOutPred = pred;
+ break;
+ }
+ }
+ identityInOut.setInputKey(identityInOutPred.getOperatorKey().toString());
+ } else if (roots.size() == 1 && roots.get(0) instanceof POValueInputTez) {
+ POValueInputTez valueInput = (POValueInputTez) roots.get(0);
+ TezOperator pred = mPlan.getPredecessors(tezOp).get(0);
+ valueInput.setInputKey(pred.getOperatorKey().toString());
}
-
payloadConf.setClass("mapreduce.outputformat.class",
PigOutputFormat.class, OutputFormat.class);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Mon Feb 17 08:26:52 2014
@@ -168,6 +168,16 @@ public class TezLauncher extends Launche
SecondaryKeyOptimizerTez skOptimizer = new SecondaryKeyOptimizerTez(tezPlan);
skOptimizer.visit();
}
+
+ boolean isMultiQuery =
+ "true".equalsIgnoreCase(pc.getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
+
+ if (isMultiQuery) {
+ // reduces the number of TezOpers in the Tez plan generated
+ // by multi-query (multi-store) script.
+ MultiQueryOptimizerTez mqOptimizer = new MultiQueryOptimizerTez(tezPlan);
+ mqOptimizer.visit();
+ }
// Run AccumulatorOptimizer on Tez plan
boolean isAccum = Boolean.parseBoolean(pc.getProperties().getProperty(
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Mon Feb 17 08:26:52 2014
@@ -71,12 +71,8 @@ public class TezOperator extends Operato
// Only POStore or POLocalRearrange leaf can be a sub-plan of POSplit
private OperatorKey splitOperatorKey = null;
- // This indicates that this TezOper has POSplit as a parent.
- // This is the case where multi-query is turned off.
- private OperatorKey splitParent = null;
-
// This indicates that this TezOper is a split operator
- private boolean isSplitOper;
+ private boolean splitter;
// Indicates that the plan creation is complete
boolean closed = false;
@@ -179,24 +175,12 @@ public class TezOperator extends Operato
this.splitOperatorKey = splitOperatorKey;
}
- public boolean isSplitSubPlan() {
- return splitOperatorKey != null;
- }
-
- public OperatorKey getSplitParent() {
- return splitParent;
- }
-
- public void setSplitParent(OperatorKey splitParent) {
- this.splitParent = splitParent;
- }
-
- public boolean isSplitOperator() {
- return isSplitOper;
+ public void setSplitter(boolean spl) {
+ splitter = spl;
}
- public void setSplitOperator(boolean isSplitOperator) {
- this.isSplitOper = isSplitOperator;
+ public boolean isSplitter() {
+ return splitter;
}
public boolean isClosed() {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Mon Feb 17 08:26:52 2014
@@ -3,9 +3,11 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map.Entry;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
@@ -76,12 +78,31 @@ public class TezCompilerUtil {
static public TezEdgeDescriptor connect(TezOperPlan plan, TezOperator from, TezOperator to) throws PlanException {
plan.connect(from, to);
+ PhysicalOperator leaf = from.plan.getLeaves().get(0);
+ // It could be POStoreTez incase of sampling job in order by
+ if (leaf instanceof POLocalRearrangeTez) {
+ POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
+ lr.setOutputKey(to.getOperatorKey().toString());
+ }
// Add edge descriptors to old and new operators
TezEdgeDescriptor edge = new TezEdgeDescriptor();
to.inEdges.put(from.getOperatorKey(), edge);
from.outEdges.put(to.getOperatorKey(), edge);
return edge;
}
+
+ static public void connect(TezOperPlan plan, TezOperator from, TezOperator to, TezEdgeDescriptor edge) throws PlanException {
+ plan.connect(from, to);
+ PhysicalOperator leaf = from.plan.getLeaves().get(0);
+ // It could be POStoreTez incase of sampling job in order by
+ if (leaf instanceof POLocalRearrangeTez) {
+ POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
+ lr.setOutputKey(to.getOperatorKey().toString());
+ }
+ // Add edge descriptors to old and new operators
+ to.inEdges.put(from.getOperatorKey(), edge);
+ from.outEdges.put(to.getOperatorKey(), edge);
+ }
static public POForEach getForEach(POProject project, int rp, String scope, NodeIdGenerator nig) {
PhysicalPlan forEachPlan = new PhysicalPlan();
Added: pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java?rev=1568901&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/impl/builtin/ReadScalarsTez.java Mon Feb 17 08:26:52 2014
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezLoad;
+import org.apache.pig.data.Tuple;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.broadcast.input.BroadcastKVReader;
+
+public class ReadScalarsTez extends EvalFunc<Object> implements TezLoad {
+ private static final Log LOG = LogFactory.getLog(ReadScalarsTez.class);
+ String inputKey;
+ Tuple t;
+ public ReadScalarsTez(String inputKey) {
+ this.inputKey = inputKey;
+ }
+ public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf)
+ throws ExecException {
+ LogicalInput input = inputs.get(inputKey);
+ if (input == null) {
+ throw new ExecException("Input from vertex " + inputKey + " is missing");
+ }
+ try {
+ BroadcastKVReader reader = (BroadcastKVReader)input.getReader();
+ reader.next();
+ t = (Tuple)reader.getCurrentValue();
+ LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
+
+ @Override
+ public Object exec(Tuple input) throws IOException {
+ int pos = (Integer)input.get(0);
+ Object obj = t.get(pos);
+ return obj;
+ }
+}
Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java?rev=1568901&r1=1568900&r2=1568901&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java Mon Feb 17 08:26:52 2014
@@ -98,9 +98,9 @@ public class ScalarVisitor extends AllEx
store.setTmpStore(true);
lp.add( store );
lp.connect( refOp, store );
- expr.setImplicitReferencedOperator(store);
}
-
+
+ expr.setImplicitReferencedOperator(store);
filenameConst.setValue( store.getOutputSpec().getFileName() );
if( lp.getSoftLinkSuccessors( store ) == null ||