You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/04/02 16:17:29 UTC

svn commit: r1584048 - in /pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez: TezDagBuilder.java TezOperator.java TezPOPackageAnnotator.java optimizers/UnionOptimizer.java

Author: rohini
Date: Wed Apr  2 14:17:29 2014
New Revision: 1584048

URL: http://svn.apache.org/r1584048
Log:
PIG-3835: Improve performance of union (rohini) - patch to fix exception in some cases

Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1584048&r1=1584047&r2=1584048&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Apr  2 14:17:29 2014
@@ -168,7 +168,12 @@ public class TezDagBuilder extends TezOp
                 try {
                     if (pred.isVertexGroup()) {
                         VertexGroup from = pred.getVertexGroupInfo().getVertexGroup();
-                        GroupInputEdge edge = newGroupInputEdge(pred, tezOp, from, to);
+                        // The plan of vertex group is empty. Since we create the Edge based on
+                        // some of the operators in the plan refer to one of the vertex group members.
+                        // Both the vertex group and its members reference same EdgeDescriptor object to the
+                        // the successor
+                        GroupInputEdge edge = newGroupInputEdge(
+                                getPlan().getOperator(pred.getVertexGroupMembers().get(0)), tezOp, from, to);
                         dag.addEdge(edge);
                     } else {
                         Vertex from = dag.getVertex(pred.getOperatorKey().toString());
@@ -430,7 +435,7 @@ public class TezDagBuilder extends TezOp
                 } else {
                     String inputKey = pred.getOperatorKey().toString();
                     if (pred.isVertexGroup()) {
-                        pred = mPlan.getOperator(pred.getVertexGroupPredecessors().get(0));
+                        pred = mPlan.getOperator(pred.getVertexGroupMembers().get(0));
                     }
                     LinkedList<POLocalRearrangeTez> lrs =
                             PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
@@ -548,7 +553,10 @@ public class TezDagBuilder extends TezOp
                     MROutput.class.getName()).setUserPayload(TezUtils
                     .createUserPayloadFromConf(outputPayLoad));
             if (tezOp.getVertexGroupStores() != null) {
-                if (tezOp.getVertexGroupStores().containsKey(store.getOperatorKey())) {
+                OperatorKey vertexGroupKey = tezOp.getVertexGroupStores().get(store.getOperatorKey());
+                if (vertexGroupKey != null) {
+                    getPlan().getOperator(vertexGroupKey).getVertexGroupInfo()
+                            .setStoreOutputDescriptor(storeOutDescriptor);
                     continue;
                 }
             }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1584048&r1=1584047&r2=1584048&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Wed Apr  2 14:17:29 2014
@@ -131,7 +131,7 @@ public class TezOperator extends Operato
 
     OPER_FEATURE feature = OPER_FEATURE.NONE;
 
-    private List<OperatorKey> vertexGroupPredecessors;
+    private List<OperatorKey> vertexGroupMembers;
     // For union
     private VertexGroupInfo vertexGroupInfo;
     // Mapping of OperatorKey of POStore OperatorKey to vertexGroup TezOperator
@@ -254,22 +254,22 @@ public class TezOperator extends Operato
     }
 
     public List<OperatorKey> getUnionPredecessors() {
-        return vertexGroupPredecessors;
+        return vertexGroupMembers;
     }
 
-    public List<OperatorKey> getVertexGroupPredecessors() {
-        return vertexGroupPredecessors;
+    public List<OperatorKey> getVertexGroupMembers() {
+        return vertexGroupMembers;
     }
 
     public void addUnionPredecessor(OperatorKey unionPredecessor) {
-        if (vertexGroupPredecessors == null) {
-            vertexGroupPredecessors = new ArrayList<OperatorKey>();
+        if (vertexGroupMembers == null) {
+            vertexGroupMembers = new ArrayList<OperatorKey>();
         }
-        this.vertexGroupPredecessors.add(unionPredecessor);
+        this.vertexGroupMembers.add(unionPredecessor);
     }
 
-    public void setVertexGroupPredecessors(List<OperatorKey> vertexGroupPredecessors) {
-        this.vertexGroupPredecessors = vertexGroupPredecessors;
+    public void setVertexGroupMembers(List<OperatorKey> vertexGroupMembers) {
+        this.vertexGroupMembers = vertexGroupMembers;
     }
 
     // Union is the only operator that uses alias vertex (VertexGroup) now. But

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java?rev=1584048&r1=1584047&r2=1584048&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java Wed Apr  2 14:17:29 2014
@@ -68,7 +68,7 @@ public class TezPOPackageAnnotator exten
             TezOperator predTezOp = it.next();
             if (predTezOp.isVertexGroup()) {
                 // Just get one of the inputs to vertex group
-                predTezOp = getPlan().getOperator(predTezOp.getVertexGroupPredecessors().get(0));
+                predTezOp = getPlan().getOperator(predTezOp.getVertexGroupMembers().get(0));
             }
             lrFound += patchPackage(predTezOp, pkgTezOp, pkg);
             if(lrFound == pkg.getNumInps()) {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java?rev=1584048&r1=1584047&r2=1584048&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java Wed Apr  2 14:17:29 2014
@@ -75,7 +75,7 @@ public class UnionOptimizer extends TezO
         // For now don't optimize
         // Create a copy as disconnect while iterating modifies the original list
         List<TezOperator> predecessors = new ArrayList<TezOperator>(tezPlan.getPredecessors(unionOp));
-        if (predecessors.size() > unionOp.getVertexGroupPredecessors().size()) {
+        if (predecessors.size() > unionOp.getVertexGroupMembers().size()) {
             return;
         }
 
@@ -87,7 +87,7 @@ public class UnionOptimizer extends TezO
         for (int i=0; i < storeVertexGroupOps.length; i++) {
             storeVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
             storeVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo(unionStoreOutputs.get(i)));
-            storeVertexGroupOps[i].setVertexGroupPredecessors(unionOp.getVertexGroupPredecessors());
+            storeVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
             tezPlan.add(storeVertexGroupOps[i]);
         }
 
@@ -111,7 +111,7 @@ public class UnionOptimizer extends TezO
             outputVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
             outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo());
             outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i));
-            outputVertexGroupOps[i].setVertexGroupPredecessors(unionOp.getVertexGroupPredecessors());
+            outputVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
             newOutputKeys[i] = outputVertexGroupOps[i].getOperatorKey().toString();
             tezPlan.add(outputVertexGroupOps[i]);
         }
@@ -121,7 +121,7 @@ public class UnionOptimizer extends TezO
              // Clone plan of union and merge it into the predecessor operators
              // Remove POShuffledValueInputTez from union plan root
             unionOpPlan.remove(unionOpPlan.getRoots().get(0));
-            for (OperatorKey predKey : unionOp.getVertexGroupPredecessors()) {
+            for (OperatorKey predKey : unionOp.getVertexGroupMembers()) {
                 TezOperator pred = tezPlan.getOperator(predKey);
                 PhysicalPlan predPlan = pred.plan;
                 // Remove POValueOutputTez from predecessor leaf
@@ -143,7 +143,7 @@ public class UnionOptimizer extends TezO
                     clonedUnionStoreOutputs.get(i).setOutputKey(
                             storeVertexGroup.getVertexGroupInfo().getStore()
                                     .getOperatorKey().toString());
-                    pred.addVertexGroupStore(unionStoreOutputs.get(i++).getOperatorKey(),
+                    pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(),
                             storeVertexGroup.getOperatorKey());
                     tezPlan.connect(pred, storeVertexGroup);
                 }
@@ -156,13 +156,21 @@ public class UnionOptimizer extends TezO
                 tezPlan.disconnect(pred, unionOp);
             }
 
-            // Copy output edges of union -> successor to vertexgroup -> successor
-            // and connect vertexgroup -> successor
+            // Copy output edges of union -> successor to predecessor->successor, vertexgroup -> successor
+            // and connect vertexgroup -> successor in the plan.
             for (Entry<OperatorKey, TezEdgeDescriptor> entry : unionOp.outEdges.entrySet()) {
                 TezOperator succOp = tezPlan.getOperator(entry.getKey());
                 TezOperator vertexGroupOp = outputVertexGroupOps[unionOutputKeys.indexOf(entry.getKey().toString())];
+                // Required for create the Edge in TezDAGBuilder
+                for (OperatorKey predKey : vertexGroupOp.getVertexGroupMembers()) {
+                    TezOperator pred = tezPlan.getOperator(predKey);
+                    pred.outEdges.put(entry.getKey(), entry.getValue());
+                    succOp.inEdges.put(predKey, entry.getValue());
+                }
+                // Not used in TezDAGBuilder. Just setting for correctness.
                 vertexGroupOp.outEdges.put(entry.getKey(), entry.getValue());
                 succOp.inEdges.put(vertexGroupOp.getOperatorKey(), entry.getValue());
+
                 tezPlan.connect(vertexGroupOp, succOp);
             }
         } catch (Exception e) {