You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/10/17 15:37:52 UTC
git commit: CRUNCH-281: A proper fix for the issue originally handled
by CRUNCH-237.
Updated Branches:
refs/heads/master 9f53a5122 -> 9c42bab14
CRUNCH-281: A proper fix for the issue originally handled by CRUNCH-237.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9c42bab1
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9c42bab1
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9c42bab1
Branch: refs/heads/master
Commit: 9c42bab14faae4883ceccae23cfe9872d33213a4
Parents: 9f53a51
Author: Josh Wills <jw...@apache.org>
Authored: Mon Oct 14 22:59:42 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Oct 17 06:35:46 2013 -0700
----------------------------------------------------------------------
.../apache/crunch/impl/mr/plan/MSCRPlanner.java | 32 ++++++++++++--------
1 file changed, 20 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/9c42bab1/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index f765313..1e0793c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -78,7 +78,7 @@ public class MSCRPlanner {
targetDeps.put(pcollect, pcollect.getTargetDependencies());
}
- Multimap<Vertex, JobPrototype> assignments = HashMultimap.create();
+ Multimap<Target, JobPrototype> assignments = HashMultimap.create();
while (!targetDeps.isEmpty()) {
Set<Target> allTargets = Sets.newHashSet();
for (PCollectionImpl<?> pcollect : targetDeps.keySet()) {
@@ -89,13 +89,11 @@ public class MSCRPlanner {
// Walk the current plan tree and build a graph in which the vertices are
// sources, targets, and GBK operations.
Set<PCollectionImpl<?>> currentStage = Sets.newHashSet();
- Set<PCollectionImpl<?>> laterStage = Sets.newHashSet();
for (PCollectionImpl<?> output : targetDeps.keySet()) {
- if (Sets.intersection(allTargets, targetDeps.get(output)).isEmpty()) {
+ Set<Target> deps = Sets.intersection(allTargets, targetDeps.get(output));
+ if (deps.isEmpty()) {
graphBuilder.visitOutput(output);
currentStage.add(output);
- } else {
- laterStage.add(output);
}
}
@@ -127,15 +125,25 @@ public class MSCRPlanner {
}
}
- // Make all of the jobs in this stage dependent on existing job
- // prototypes.
- for (JobPrototype newPrototype : newAssignments.values()) {
- for (JobPrototype oldPrototype : assignments.values()) {
- newPrototype.addDependency(oldPrototype);
+ for (Map.Entry<Vertex, JobPrototype> e : newAssignments.entries()) {
+ if (e.getKey().isOutput()) {
+ PCollectionImpl<?> pcollect = e.getKey().getPCollection();
+ JobPrototype current = e.getValue();
+
+ // Add in implicit dependencies via SourceTargets that are read into memory
+ for (Target pt : pcollect.getTargetDependencies()) {
+ for (JobPrototype parentJobProto : assignments.get(pt)) {
+ current.addDependency(parentJobProto);
+ }
+ }
+
+ // Add this to the set of output assignments
+ for (Target t : outputs.get(pcollect)) {
+ assignments.put(t, e.getValue());
+ }
}
}
- assignments.putAll(newAssignments);
-
+
// Remove completed outputs and mark materialized output locations
// for subsequent job processing.
for (PCollectionImpl<?> output : currentStage) {