You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/06/20 03:45:43 UTC
git commit: CRUNCH-420: Change breakpointing logic around
PCollection's that are materialized before GBK operations.
Repository: crunch
Updated Branches:
refs/heads/master 2ef8c1678 -> fd0bce36b
CRUNCH-420: Change breakpointing logic around PCollection's that are materialized
before GBK operations.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/fd0bce36
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/fd0bce36
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/fd0bce36
Branch: refs/heads/master
Commit: fd0bce36b918621ff45149e877848b14da3b8fed
Parents: 2ef8c16
Author: Josh Wills <jw...@apache.org>
Authored: Sun Jun 15 23:22:20 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Jun 19 13:02:40 2014 -0700
----------------------------------------------------------------------
.../java/org/apache/crunch/Breakpoint2IT.java | 106 +++++++++++++++++++
.../it/java/org/apache/crunch/BreakpointIT.java | 8 +-
.../org/apache/crunch/impl/mr/plan/Edge.java | 11 +-
.../apache/crunch/impl/mr/plan/MSCRPlanner.java | 33 ++++--
4 files changed, 141 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/fd0bce36/crunch-core/src/it/java/org/apache/crunch/Breakpoint2IT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/Breakpoint2IT.java b/crunch-core/src/it/java/org/apache/crunch/Breakpoint2IT.java
new file mode 100644
index 0000000..4b76c8b
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/Breakpoint2IT.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.MRPipelineExecution;
+import org.apache.crunch.impl.mr.exec.MRExecutor;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.crunch.lib.Join;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class Breakpoint2IT {
+ private static final class PTableTransform extends DoFn<String, Pair<String, Integer>> {
+ @Override
+ public void process(final String s, final Emitter<Pair<String, Integer>> emitter) {
+ for (int i = 0; i < 10; i++) {
+ emitter.emit(new Pair<String, Integer>(s, i));
+ }
+ }
+ }
+
+ @Rule
+ public TemporaryPath tmpDir = TemporaryPaths.create();
+
+ @Test
+ public void testNoBreakpoint() throws Exception {
+ run(new MRPipeline(Breakpoint2IT.class, tmpDir.getDefaultConfiguration()),
+ tmpDir.copyResourceFileName("letters.txt"),
+ tmpDir.copyResourceFileName("urls.txt"),
+ tmpDir.copyResourceFileName("docs.txt"),
+ tmpDir.getFileName("out1"),
+ tmpDir.getFileName("out2"),
+ false);
+ }
+
+ @Test
+ public void testBreakpoint() throws Exception {
+ run(new MRPipeline(Breakpoint2IT.class, tmpDir.getDefaultConfiguration()),
+ tmpDir.copyResourceFileName("letters.txt"),
+ tmpDir.copyResourceFileName("urls.txt"),
+ tmpDir.copyResourceFileName("docs.txt"),
+ tmpDir.getFileName("out1"),
+ tmpDir.getFileName("out2"),
+ true);
+ }
+
+ public static void run(MRPipeline pipeline, String input1, String input2, String input3,
+ String out1, String out2, boolean breakpoint) throws Exception {
+ // Read a line from a file to get a PCollection.
+ PCollection<String> pCol1 = pipeline.read(From.textFile(input1));
+ PCollection<String> pCol2 = pipeline.read(From.textFile(input2));
+ PCollection<String> pCol3 = pipeline.read(From.textFile(input3));
+
+ // Create PTables from the PCollections
+ PTable<String, Integer> pTable1 = pCol1.parallelDo("Transform pCol1 to PTable", new PTableTransform(),
+ Writables.tableOf(Writables.strings(), Writables.ints()));
+ if (breakpoint) {
+ pTable1.materialize();
+ }
+
+ PTable<String, Integer> pTable2 = pCol2.parallelDo("Transform pCol2 to PTable", new PTableTransform(),
+ Writables.tableOf(Writables.strings(), Writables.ints()));
+ PTable<String, Integer> pTable3 = pCol3.parallelDo("Transform pCol3 to PTable", new PTableTransform(),
+ Writables.tableOf(Writables.strings(), Writables.ints()));
+
+ // Perform joins to pTable1
+ PTable<String, Pair<Integer, Integer>> join1 = Join.leftJoin(pTable1, pTable2);
+ PTable<String, Pair<Integer, Integer>> join2 = Join.rightJoin(pTable1, pTable3);
+
+ // Write joins
+ join1.keys().write(To.textFile(out1));
+ join2.keys().write(To.textFile(out2));
+
+ MRPipelineExecution exec = pipeline.runAsync();
+ int fnCount = 0;
+ for (String line : exec.getPlanDotFile().split("\n")) {
+ if (line.contains("label=\"Transform pCol1 to PTable\"")) {
+ fnCount++;
+ }
+ }
+ assertEquals(breakpoint ? 1 : 2, fnCount);
+ exec.waitUntilDone();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/fd0bce36/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java b/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java
index 790f049..8a21fa6 100644
--- a/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java
@@ -18,6 +18,7 @@
package org.apache.crunch;
import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.exec.MRExecutor;
import org.apache.crunch.io.From;
import org.apache.crunch.io.To;
import org.apache.crunch.test.TemporaryPath;
@@ -51,7 +52,7 @@ public class BreakpointIT {
true);
}
- public static void run(Pipeline pipeline, String input, String out1, String out2, boolean breakpoint)
+ public static void run(MRPipeline pipeline, String input, String out1, String out2, boolean breakpoint)
throws Exception {
// Read a line from a file to get a PCollection.
@@ -115,15 +116,14 @@ public class BreakpointIT {
// Write values
pGrpTable3.ungroup().write(To.textFile(out2));
- PipelineExecution pe = pipeline.runAsync();
+ MRExecutor exec = pipeline.plan();
// Count the number of map processing steps in this pipeline
int mapsCount = 0;
- for (String line : pe.getPlanDotFile().split("\n")) {
+ for (String line : exec.getPlanDotFile().split("\n")) {
if (line.contains(" subgraph ") && line.contains("-map\" {")) {
mapsCount++;
}
}
assertEquals(breakpoint ? 1 : 2, mapsCount);
- pe.waitUntilDone();
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/fd0bce36/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
index 67c624d..4006930 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ReflectionToStringBuilder;
@@ -64,7 +65,7 @@ class Edge {
return paths;
}
- public Map<NodePath, PCollectionImpl> getSplitPoints(Map<PCollectionImpl<?>, Set<Target>> outputs) {
+ public Map<NodePath, PCollectionImpl> getSplitPoints(boolean breakpointsOnly) {
List<NodePath> np = Lists.newArrayList(paths);
List<PCollectionImpl<?>> smallestOverallPerPath = Lists.newArrayListWithExpectedSize(np.size());
Map<PCollectionImpl<?>, Set<Integer>> pathCounts = Maps.newHashMap();
@@ -74,7 +75,7 @@ class Edge {
boolean breakpoint = false;
PCollectionImpl<?> best = null;
for (PCollectionImpl<?> pc : np.get(i)) {
- if (!(pc instanceof BaseGroupedTable)) {
+ if (!(pc instanceof BaseGroupedTable) && (!breakpointsOnly || pc.isBreakpoint())) {
if (pc.isBreakpoint()) {
if (!breakpoint || pc.getSize() < bestSize) {
best = pc;
@@ -105,7 +106,11 @@ class Edge {
missing.add(i);
}
}
- if (missing.isEmpty()) {
+
+ if (breakpointsOnly && missing.size() > 0) {
+ // We can't create new splits in this mode
+ return ImmutableMap.of();
+ } else if (missing.isEmpty()) {
return splitPoints;
} else {
// Need to either choose the smallest collection from each missing path,
http://git-wip-us.apache.org/repos/asf/crunch/blob/fd0bce36/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 72c431b..c9a6136 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -124,14 +124,13 @@ public class MSCRPlanner {
// job prototype a particular GBK is assigned to.
Multimap<Vertex, JobPrototype> newAssignments = HashMultimap.create();
for (List<Vertex> component : components) {
- newAssignments.putAll(constructJobPrototypes(component, components.size()));
+ newAssignments.putAll(constructJobPrototypes(component));
}
// Add in the job dependency information here.
for (Map.Entry<Vertex, JobPrototype> e : newAssignments.entries()) {
JobPrototype current = e.getValue();
- List<Vertex> parents = graph.getParents(e.getKey());
- for (Vertex parent : parents) {
+ for (Vertex parent : graph.getParents(e.getKey())) {
for (JobPrototype parentJobProto : newAssignments.get(parent)) {
current.addDependency(parentJobProto);
}
@@ -206,10 +205,8 @@ public class MSCRPlanner {
}
for (Edge e : baseGraph.getAllEdges()) {
- // Add back all of the edges where neither vertex is a GBK and we do not
- // have an output feeding into a GBK.
- if (!(e.getHead().isGBK() && e.getTail().isGBK()) &&
- !(e.getHead().isOutput() && e.getTail().isGBK())) {
+ // Add back all of the edges where neither vertex is a GBK.
+ if (!e.getHead().isGBK() && !e.getTail().isGBK()) {
Vertex head = graph.getVertexAt(e.getHead().getPCollection());
Vertex tail = graph.getVertexAt(e.getTail().getPCollection());
graph.getEdge(head, tail).addAllNodePaths(e.getNodePaths());
@@ -239,7 +236,23 @@ public class MSCRPlanner {
graph.markDependency(splitHead, splitTail);
} else if (!e.getHead().isGBK()) {
Vertex newHead = graph.getVertexAt(e.getHead().getPCollection());
- graph.getEdge(newHead, vertex).addAllNodePaths(e.getNodePaths());
+ Map<NodePath, PCollectionImpl> splitPoints = e.getSplitPoints(true /* breakpoints only */);
+ if (splitPoints.isEmpty()) {
+ graph.getEdge(newHead, vertex).addAllNodePaths(e.getNodePaths());
+ } else {
+ for (Map.Entry<NodePath, PCollectionImpl> s : splitPoints.entrySet()) {
+ NodePath path = s.getKey();
+ PCollectionImpl split = s.getValue();
+ InputCollection<?> inputNode = handleSplitTarget(split);
+ Vertex splitTail = graph.addVertex(split, true);
+ Vertex splitHead = graph.addVertex(inputNode, false);
+ NodePath headPath = path.splitAt(split, splitHead.getPCollection());
+ graph.getEdge(newHead, splitTail).addNodePath(headPath);
+ graph.getEdge(splitHead, vertex).addNodePath(path);
+ // Note the dependency between the vertices in the graph.
+ graph.markDependency(splitHead, splitTail);
+ }
+ }
}
}
for (Edge e : baseVertex.getOutgoingEdges()) {
@@ -249,7 +262,7 @@ public class MSCRPlanner {
} else {
// Execute an Edge split
Vertex newGraphTail = graph.getVertexAt(e.getTail().getPCollection());
- Map<NodePath, PCollectionImpl> splitPoints = e.getSplitPoints(outputs);
+ Map<NodePath, PCollectionImpl> splitPoints = e.getSplitPoints(false /* breakpoints only */);
for (Map.Entry<NodePath, PCollectionImpl> s : splitPoints.entrySet()) {
NodePath path = s.getKey();
PCollectionImpl split = s.getValue();
@@ -270,7 +283,7 @@ public class MSCRPlanner {
return graph;
}
- private Multimap<Vertex, JobPrototype> constructJobPrototypes(List<Vertex> component, int numOfJobs) {
+ private Multimap<Vertex, JobPrototype> constructJobPrototypes(List<Vertex> component) {
Multimap<Vertex, JobPrototype> assignment = HashMultimap.create();
List<Vertex> gbks = Lists.newArrayList();
for (Vertex v : component) {