You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/04/02 16:17:29 UTC
svn commit: r1584048 - in
/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez:
TezDagBuilder.java TezOperator.java TezPOPackageAnnotator.java
optimizers/UnionOptimizer.java
Author: rohini
Date: Wed Apr 2 14:17:29 2014
New Revision: 1584048
URL: http://svn.apache.org/r1584048
Log:
PIG-3835: Improve performance of union (rohini) - patch to fix exception in some cases
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1584048&r1=1584047&r2=1584048&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Apr 2 14:17:29 2014
@@ -168,7 +168,12 @@ public class TezDagBuilder extends TezOp
try {
if (pred.isVertexGroup()) {
VertexGroup from = pred.getVertexGroupInfo().getVertexGroup();
- GroupInputEdge edge = newGroupInputEdge(pred, tezOp, from, to);
+ // The plan of vertex group is empty. Since we create the Edge based on
+ // some of the operators in the plan refer to one of the vertex group members.
+ // Both the vertex group and its members reference same EdgeDescriptor object to the
+ // the successor
+ GroupInputEdge edge = newGroupInputEdge(
+ getPlan().getOperator(pred.getVertexGroupMembers().get(0)), tezOp, from, to);
dag.addEdge(edge);
} else {
Vertex from = dag.getVertex(pred.getOperatorKey().toString());
@@ -430,7 +435,7 @@ public class TezDagBuilder extends TezOp
} else {
String inputKey = pred.getOperatorKey().toString();
if (pred.isVertexGroup()) {
- pred = mPlan.getOperator(pred.getVertexGroupPredecessors().get(0));
+ pred = mPlan.getOperator(pred.getVertexGroupMembers().get(0));
}
LinkedList<POLocalRearrangeTez> lrs =
PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
@@ -548,7 +553,10 @@ public class TezDagBuilder extends TezOp
MROutput.class.getName()).setUserPayload(TezUtils
.createUserPayloadFromConf(outputPayLoad));
if (tezOp.getVertexGroupStores() != null) {
- if (tezOp.getVertexGroupStores().containsKey(store.getOperatorKey())) {
+ OperatorKey vertexGroupKey = tezOp.getVertexGroupStores().get(store.getOperatorKey());
+ if (vertexGroupKey != null) {
+ getPlan().getOperator(vertexGroupKey).getVertexGroupInfo()
+ .setStoreOutputDescriptor(storeOutDescriptor);
continue;
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1584048&r1=1584047&r2=1584048&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Wed Apr 2 14:17:29 2014
@@ -131,7 +131,7 @@ public class TezOperator extends Operato
OPER_FEATURE feature = OPER_FEATURE.NONE;
- private List<OperatorKey> vertexGroupPredecessors;
+ private List<OperatorKey> vertexGroupMembers;
// For union
private VertexGroupInfo vertexGroupInfo;
// Mapping of OperatorKey of POStore OperatorKey to vertexGroup TezOperator
@@ -254,22 +254,22 @@ public class TezOperator extends Operato
}
public List<OperatorKey> getUnionPredecessors() {
- return vertexGroupPredecessors;
+ return vertexGroupMembers;
}
- public List<OperatorKey> getVertexGroupPredecessors() {
- return vertexGroupPredecessors;
+ public List<OperatorKey> getVertexGroupMembers() {
+ return vertexGroupMembers;
}
public void addUnionPredecessor(OperatorKey unionPredecessor) {
- if (vertexGroupPredecessors == null) {
- vertexGroupPredecessors = new ArrayList<OperatorKey>();
+ if (vertexGroupMembers == null) {
+ vertexGroupMembers = new ArrayList<OperatorKey>();
}
- this.vertexGroupPredecessors.add(unionPredecessor);
+ this.vertexGroupMembers.add(unionPredecessor);
}
- public void setVertexGroupPredecessors(List<OperatorKey> vertexGroupPredecessors) {
- this.vertexGroupPredecessors = vertexGroupPredecessors;
+ public void setVertexGroupMembers(List<OperatorKey> vertexGroupMembers) {
+ this.vertexGroupMembers = vertexGroupMembers;
}
// Union is the only operator that uses alias vertex (VertexGroup) now. But
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java?rev=1584048&r1=1584047&r2=1584048&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java Wed Apr 2 14:17:29 2014
@@ -68,7 +68,7 @@ public class TezPOPackageAnnotator exten
TezOperator predTezOp = it.next();
if (predTezOp.isVertexGroup()) {
// Just get one of the inputs to vertex group
- predTezOp = getPlan().getOperator(predTezOp.getVertexGroupPredecessors().get(0));
+ predTezOp = getPlan().getOperator(predTezOp.getVertexGroupMembers().get(0));
}
lrFound += patchPackage(predTezOp, pkgTezOp, pkg);
if(lrFound == pkg.getNumInps()) {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java?rev=1584048&r1=1584047&r2=1584048&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java Wed Apr 2 14:17:29 2014
@@ -75,7 +75,7 @@ public class UnionOptimizer extends TezO
// For now don't optimize
// Create a copy as disconnect while iterating modifies the original list
List<TezOperator> predecessors = new ArrayList<TezOperator>(tezPlan.getPredecessors(unionOp));
- if (predecessors.size() > unionOp.getVertexGroupPredecessors().size()) {
+ if (predecessors.size() > unionOp.getVertexGroupMembers().size()) {
return;
}
@@ -87,7 +87,7 @@ public class UnionOptimizer extends TezO
for (int i=0; i < storeVertexGroupOps.length; i++) {
storeVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
storeVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo(unionStoreOutputs.get(i)));
- storeVertexGroupOps[i].setVertexGroupPredecessors(unionOp.getVertexGroupPredecessors());
+ storeVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
tezPlan.add(storeVertexGroupOps[i]);
}
@@ -111,7 +111,7 @@ public class UnionOptimizer extends TezO
outputVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo());
outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i));
- outputVertexGroupOps[i].setVertexGroupPredecessors(unionOp.getVertexGroupPredecessors());
+ outputVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
newOutputKeys[i] = outputVertexGroupOps[i].getOperatorKey().toString();
tezPlan.add(outputVertexGroupOps[i]);
}
@@ -121,7 +121,7 @@ public class UnionOptimizer extends TezO
// Clone plan of union and merge it into the predecessor operators
// Remove POShuffledValueInputTez from union plan root
unionOpPlan.remove(unionOpPlan.getRoots().get(0));
- for (OperatorKey predKey : unionOp.getVertexGroupPredecessors()) {
+ for (OperatorKey predKey : unionOp.getVertexGroupMembers()) {
TezOperator pred = tezPlan.getOperator(predKey);
PhysicalPlan predPlan = pred.plan;
// Remove POValueOutputTez from predecessor leaf
@@ -143,7 +143,7 @@ public class UnionOptimizer extends TezO
clonedUnionStoreOutputs.get(i).setOutputKey(
storeVertexGroup.getVertexGroupInfo().getStore()
.getOperatorKey().toString());
- pred.addVertexGroupStore(unionStoreOutputs.get(i++).getOperatorKey(),
+ pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(),
storeVertexGroup.getOperatorKey());
tezPlan.connect(pred, storeVertexGroup);
}
@@ -156,13 +156,21 @@ public class UnionOptimizer extends TezO
tezPlan.disconnect(pred, unionOp);
}
- // Copy output edges of union -> successor to vertexgroup -> successor
- // and connect vertexgroup -> successor
+ // Copy output edges of union -> successor to predecessor->successor, vertexgroup -> successor
+ // and connect vertexgroup -> successor in the plan.
for (Entry<OperatorKey, TezEdgeDescriptor> entry : unionOp.outEdges.entrySet()) {
TezOperator succOp = tezPlan.getOperator(entry.getKey());
TezOperator vertexGroupOp = outputVertexGroupOps[unionOutputKeys.indexOf(entry.getKey().toString())];
+ // Required for create the Edge in TezDAGBuilder
+ for (OperatorKey predKey : vertexGroupOp.getVertexGroupMembers()) {
+ TezOperator pred = tezPlan.getOperator(predKey);
+ pred.outEdges.put(entry.getKey(), entry.getValue());
+ succOp.inEdges.put(predKey, entry.getValue());
+ }
+ // Not used in TezDAGBuilder. Just setting for correctness.
vertexGroupOp.outEdges.put(entry.getKey(), entry.getValue());
succOp.inEdges.put(vertexGroupOp.getOperatorKey(), entry.getValue());
+
tezPlan.connect(vertexGroupOp, succOp);
}
} catch (Exception e) {