You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/10/17 15:37:52 UTC

git commit: CRUNCH-281: A proper fix for the issue originally handled by CRUNCH-237.

Updated Branches:
  refs/heads/master 9f53a5122 -> 9c42bab14


CRUNCH-281: A proper fix for the issue originally handled by CRUNCH-237.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9c42bab1
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9c42bab1
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9c42bab1

Branch: refs/heads/master
Commit: 9c42bab14faae4883ceccae23cfe9872d33213a4
Parents: 9f53a51
Author: Josh Wills <jw...@apache.org>
Authored: Mon Oct 14 22:59:42 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Oct 17 06:35:46 2013 -0700

----------------------------------------------------------------------
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 32 ++++++++++++--------
 1 file changed, 20 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/9c42bab1/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index f765313..1e0793c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -78,7 +78,7 @@ public class MSCRPlanner {
       targetDeps.put(pcollect, pcollect.getTargetDependencies());
     }
     
-    Multimap<Vertex, JobPrototype> assignments = HashMultimap.create();
+    Multimap<Target, JobPrototype> assignments = HashMultimap.create();
     while (!targetDeps.isEmpty()) {
       Set<Target> allTargets = Sets.newHashSet();
       for (PCollectionImpl<?> pcollect : targetDeps.keySet()) {
@@ -89,13 +89,11 @@ public class MSCRPlanner {
       // Walk the current plan tree and build a graph in which the vertices are
       // sources, targets, and GBK operations.
       Set<PCollectionImpl<?>> currentStage = Sets.newHashSet();
-      Set<PCollectionImpl<?>> laterStage = Sets.newHashSet();
       for (PCollectionImpl<?> output : targetDeps.keySet()) {
-        if (Sets.intersection(allTargets, targetDeps.get(output)).isEmpty()) {
+        Set<Target> deps = Sets.intersection(allTargets, targetDeps.get(output));
+        if (deps.isEmpty()) {
           graphBuilder.visitOutput(output);
           currentStage.add(output);
-        } else {
-          laterStage.add(output);
         }
       }
       
@@ -127,15 +125,25 @@ public class MSCRPlanner {
         }
       }
 
-      // Make all of the jobs in this stage dependent on existing job
-      // prototypes.
-      for (JobPrototype newPrototype : newAssignments.values()) {
-        for (JobPrototype oldPrototype : assignments.values()) {
-          newPrototype.addDependency(oldPrototype);
+      for (Map.Entry<Vertex, JobPrototype> e : newAssignments.entries()) {
+        if (e.getKey().isOutput()) {
+          PCollectionImpl<?> pcollect = e.getKey().getPCollection();
+          JobPrototype current = e.getValue();
+
+          // Add in implicit dependencies via SourceTargets that are read into memory
+          for (Target pt : pcollect.getTargetDependencies()) {
+            for (JobPrototype parentJobProto : assignments.get(pt)) {
+              current.addDependency(parentJobProto);
+            }
+          }
+
+          // Add this to the set of output assignments
+          for (Target t : outputs.get(pcollect)) {
+            assignments.put(t, e.getValue());
+          }
         }
       }
-      assignments.putAll(newAssignments);
-      
+
       // Remove completed outputs and mark materialized output locations
       // for subsequent job processing.
       for (PCollectionImpl<?> output : currentStage) {