You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/06/20 03:45:43 UTC

git commit: CRUNCH-420: Change breakpointing logic around PCollection's that are materialized before GBK operations.

Repository: crunch
Updated Branches:
  refs/heads/master 2ef8c1678 -> fd0bce36b


CRUNCH-420: Change breakpointing logic around PCollection's that are materialized
before GBK operations.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/fd0bce36
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/fd0bce36
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/fd0bce36

Branch: refs/heads/master
Commit: fd0bce36b918621ff45149e877848b14da3b8fed
Parents: 2ef8c16
Author: Josh Wills <jw...@apache.org>
Authored: Sun Jun 15 23:22:20 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Jun 19 13:02:40 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/crunch/Breakpoint2IT.java   | 106 +++++++++++++++++++
 .../it/java/org/apache/crunch/BreakpointIT.java |   8 +-
 .../org/apache/crunch/impl/mr/plan/Edge.java    |  11 +-
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java |  33 ++++--
 4 files changed, 141 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/fd0bce36/crunch-core/src/it/java/org/apache/crunch/Breakpoint2IT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/Breakpoint2IT.java b/crunch-core/src/it/java/org/apache/crunch/Breakpoint2IT.java
new file mode 100644
index 0000000..4b76c8b
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/Breakpoint2IT.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.MRPipelineExecution;
+import org.apache.crunch.impl.mr.exec.MRExecutor;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.crunch.lib.Join;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class Breakpoint2IT {
+  private static final class PTableTransform extends DoFn<String, Pair<String, Integer>> {
+    @Override
+    public void process(final String s, final Emitter<Pair<String, Integer>> emitter) {
+      for (int i = 0; i < 10; i++) {
+        emitter.emit(new Pair<String, Integer>(s, i));
+      }
+    }
+  }
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testNoBreakpoint() throws Exception {
+    run(new MRPipeline(Breakpoint2IT.class, tmpDir.getDefaultConfiguration()),
+        tmpDir.copyResourceFileName("letters.txt"),
+        tmpDir.copyResourceFileName("urls.txt"),
+        tmpDir.copyResourceFileName("docs.txt"),
+        tmpDir.getFileName("out1"),
+        tmpDir.getFileName("out2"),
+        false);
+  }
+
+  @Test
+  public void testBreakpoint() throws Exception {
+    run(new MRPipeline(Breakpoint2IT.class, tmpDir.getDefaultConfiguration()),
+        tmpDir.copyResourceFileName("letters.txt"),
+        tmpDir.copyResourceFileName("urls.txt"),
+        tmpDir.copyResourceFileName("docs.txt"),
+        tmpDir.getFileName("out1"),
+        tmpDir.getFileName("out2"),
+        true);
+  }
+
+  public static void run(MRPipeline pipeline, String input1, String input2, String input3,
+                         String out1, String out2, boolean breakpoint) throws Exception {
+    // Read a line from a file to get a PCollection.
+    PCollection<String> pCol1 = pipeline.read(From.textFile(input1));
+    PCollection<String> pCol2 = pipeline.read(From.textFile(input2));
+    PCollection<String> pCol3 = pipeline.read(From.textFile(input3));
+
+    // Create PTables from the PCollections
+    PTable<String, Integer> pTable1 = pCol1.parallelDo("Transform pCol1 to PTable", new PTableTransform(),
+        Writables.tableOf(Writables.strings(), Writables.ints()));
+    if (breakpoint) {
+      pTable1.materialize();
+    }
+
+    PTable<String, Integer> pTable2 = pCol2.parallelDo("Transform pCol2 to PTable", new PTableTransform(),
+        Writables.tableOf(Writables.strings(), Writables.ints()));
+    PTable<String, Integer> pTable3 = pCol3.parallelDo("Transform pCol3 to PTable", new PTableTransform(),
+        Writables.tableOf(Writables.strings(), Writables.ints()));
+
+    // Perform joins to pTable1
+    PTable<String, Pair<Integer, Integer>> join1 = Join.leftJoin(pTable1, pTable2);
+    PTable<String, Pair<Integer, Integer>> join2 = Join.rightJoin(pTable1, pTable3);
+
+    // Write joins
+    join1.keys().write(To.textFile(out1));
+    join2.keys().write(To.textFile(out2));
+
+    MRPipelineExecution exec = pipeline.runAsync();
+    int fnCount = 0;
+    for (String line : exec.getPlanDotFile().split("\n")) {
+      if (line.contains("label=\"Transform pCol1 to PTable\"")) {
+        fnCount++;
+      }
+    }
+    assertEquals(breakpoint ? 1 : 2, fnCount);
+    exec.waitUntilDone();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/fd0bce36/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java b/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java
index 790f049..8a21fa6 100644
--- a/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java
@@ -18,6 +18,7 @@
 package org.apache.crunch;
 
 import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.exec.MRExecutor;
 import org.apache.crunch.io.From;
 import org.apache.crunch.io.To;
 import org.apache.crunch.test.TemporaryPath;
@@ -51,7 +52,7 @@ public class BreakpointIT {
         true);
   }
 
-  public static void run(Pipeline pipeline, String input, String out1, String out2, boolean breakpoint)
+  public static void run(MRPipeline pipeline, String input, String out1, String out2, boolean breakpoint)
       throws Exception {
 
     // Read a line from a file to get a PCollection.
@@ -115,15 +116,14 @@ public class BreakpointIT {
     // Write values
     pGrpTable3.ungroup().write(To.textFile(out2));
 
-    PipelineExecution pe = pipeline.runAsync();
+    MRExecutor exec = pipeline.plan();
     // Count the number of map processing steps in this pipeline
     int mapsCount = 0;
-    for (String line : pe.getPlanDotFile().split("\n")) {
+    for (String line : exec.getPlanDotFile().split("\n")) {
       if (line.contains(" subgraph ") && line.contains("-map\" {")) {
         mapsCount++;
       }
     }
     assertEquals(breakpoint ? 1 : 2, mapsCount);
-    pe.waitUntilDone();
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/fd0bce36/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
index 67c624d..4006930 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.lang.builder.ReflectionToStringBuilder;
@@ -64,7 +65,7 @@ class Edge {
     return paths;
   }
 
-  public Map<NodePath,  PCollectionImpl> getSplitPoints(Map<PCollectionImpl<?>, Set<Target>> outputs) {
+  public Map<NodePath,  PCollectionImpl> getSplitPoints(boolean breakpointsOnly) {
     List<NodePath> np = Lists.newArrayList(paths);
     List<PCollectionImpl<?>> smallestOverallPerPath = Lists.newArrayListWithExpectedSize(np.size());
     Map<PCollectionImpl<?>, Set<Integer>> pathCounts = Maps.newHashMap();
@@ -74,7 +75,7 @@ class Edge {
       boolean breakpoint = false;
       PCollectionImpl<?> best = null;
       for (PCollectionImpl<?> pc : np.get(i)) {
-        if (!(pc instanceof BaseGroupedTable)) {
+        if (!(pc instanceof BaseGroupedTable) && (!breakpointsOnly || pc.isBreakpoint())) {
           if (pc.isBreakpoint()) {
             if (!breakpoint || pc.getSize() < bestSize) {
               best = pc;
@@ -105,7 +106,11 @@ class Edge {
         missing.add(i);
       }
     }
-    if (missing.isEmpty()) {
+
+    if (breakpointsOnly && missing.size() > 0) {
+      // We can't create new splits in this mode
+      return ImmutableMap.of();
+    } else if (missing.isEmpty()) {
       return splitPoints;
     } else {
       // Need to either choose the smallest collection from each missing path,

http://git-wip-us.apache.org/repos/asf/crunch/blob/fd0bce36/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 72c431b..c9a6136 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -124,14 +124,13 @@ public class MSCRPlanner {
       // job prototype a particular GBK is assigned to.
       Multimap<Vertex, JobPrototype> newAssignments = HashMultimap.create();
       for (List<Vertex> component : components) {
-        newAssignments.putAll(constructJobPrototypes(component, components.size()));
+        newAssignments.putAll(constructJobPrototypes(component));
       }
 
       // Add in the job dependency information here.
       for (Map.Entry<Vertex, JobPrototype> e : newAssignments.entries()) {
         JobPrototype current = e.getValue();
-        List<Vertex> parents = graph.getParents(e.getKey());
-        for (Vertex parent : parents) {
+        for (Vertex parent : graph.getParents(e.getKey())) {
           for (JobPrototype parentJobProto : newAssignments.get(parent)) {
             current.addDependency(parentJobProto);
           }
@@ -206,10 +205,8 @@ public class MSCRPlanner {
     }
     
     for (Edge e : baseGraph.getAllEdges()) {
-      // Add back all of the edges where neither vertex is a GBK and we do not
-      // have an output feeding into a GBK.
-      if (!(e.getHead().isGBK() && e.getTail().isGBK()) &&
-          !(e.getHead().isOutput() && e.getTail().isGBK())) {
+      // Add back all of the edges where neither vertex is a GBK.
+      if (!e.getHead().isGBK() && !e.getTail().isGBK()) {
         Vertex head = graph.getVertexAt(e.getHead().getPCollection());
         Vertex tail = graph.getVertexAt(e.getTail().getPCollection());
         graph.getEdge(head, tail).addAllNodePaths(e.getNodePaths());
@@ -239,7 +236,23 @@ public class MSCRPlanner {
             graph.markDependency(splitHead, splitTail);
           } else if (!e.getHead().isGBK()) {
             Vertex newHead = graph.getVertexAt(e.getHead().getPCollection());
-            graph.getEdge(newHead, vertex).addAllNodePaths(e.getNodePaths());
+            Map<NodePath, PCollectionImpl> splitPoints = e.getSplitPoints(true /* breakpoints only  */);
+            if (splitPoints.isEmpty()) {
+              graph.getEdge(newHead, vertex).addAllNodePaths(e.getNodePaths());
+            } else {
+              for (Map.Entry<NodePath, PCollectionImpl> s : splitPoints.entrySet()) {
+                NodePath path = s.getKey();
+                PCollectionImpl split = s.getValue();
+                InputCollection<?> inputNode = handleSplitTarget(split);
+                Vertex splitTail = graph.addVertex(split, true);
+                Vertex splitHead = graph.addVertex(inputNode, false);
+                NodePath headPath = path.splitAt(split, splitHead.getPCollection());
+                graph.getEdge(newHead, splitTail).addNodePath(headPath);
+                graph.getEdge(splitHead, vertex).addNodePath(path);
+                // Note the dependency between the vertices in the graph.
+                graph.markDependency(splitHead, splitTail);
+              }
+            }
           }
         }
         for (Edge e : baseVertex.getOutgoingEdges()) {
@@ -249,7 +262,7 @@ public class MSCRPlanner {
           } else {
             // Execute an Edge split
             Vertex newGraphTail = graph.getVertexAt(e.getTail().getPCollection());
-            Map<NodePath, PCollectionImpl> splitPoints = e.getSplitPoints(outputs);
+            Map<NodePath, PCollectionImpl> splitPoints = e.getSplitPoints(false /* breakpoints only */);
             for (Map.Entry<NodePath, PCollectionImpl> s : splitPoints.entrySet()) {
               NodePath path = s.getKey();
               PCollectionImpl split = s.getValue();
@@ -270,7 +283,7 @@ public class MSCRPlanner {
     return graph;
   }
   
-  private Multimap<Vertex, JobPrototype> constructJobPrototypes(List<Vertex> component, int numOfJobs) {
+  private Multimap<Vertex, JobPrototype> constructJobPrototypes(List<Vertex> component) {
     Multimap<Vertex, JobPrototype> assignment = HashMultimap.create();
     List<Vertex> gbks = Lists.newArrayList();
     for (Vertex v : component) {