You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/01/29 18:51:26 UTC

crunch git commit: CRUNCH-493: Re-enable exception throwing on reading materialized PCollections after a pipeline failure

Repository: crunch
Updated Branches:
  refs/heads/master cbb1b7e75 -> 4f2b1f25f


CRUNCH-493: Re-enable exception throwing on reading materialized PCollections after a pipeline failure


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/4f2b1f25
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/4f2b1f25
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/4f2b1f25

Branch: refs/heads/master
Commit: 4f2b1f25fe21aca7c2df4747e754862d113173be
Parents: cbb1b7e
Author: Josh Wills <jw...@apache.org>
Authored: Wed Jan 28 12:41:03 2015 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Jan 29 09:15:36 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/crunch/MaterializeIT.java   | 54 ++++++++++++++------
 .../materialize/MaterializableIterable.java     |  2 +-
 2 files changed, 40 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/4f2b1f25/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
index cb0f306..7bc61df 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.util.List;
 
 import com.google.common.collect.Iterables;
-import org.apache.crunch.fn.FilterFns;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.materialize.MaterializableIterable;
@@ -36,6 +35,7 @@ import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Assume;
 import org.junit.Rule;
 import org.junit.Test;
@@ -70,27 +70,27 @@ public class MaterializeIT {
   }
 
   @Test
-  public void testMaterializeEmptyIntermediate_Writables() throws IOException {
+  public void testMaterializeEmptyIntermediate() throws IOException {
     runMaterializeEmptyIntermediate(
-        new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()),
-        WritableTypeFamily.getInstance());
+        new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()));
   }
 
   @Test
-  public void testMaterializeEmptyIntermediate_Avro() throws IOException {
-    runMaterializeEmptyIntermediate(
-        new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()),
-        AvroTypeFamily.getInstance());
+  public void testMaterializeEmptyIntermediate_InMemory() throws IOException {
+    runMaterializeEmptyIntermediate(MemPipeline.getInstance());
   }
 
-  @Test
-  public void testMaterializeEmptyIntermediate_InMemoryWritables() throws IOException {
-    runMaterializeEmptyIntermediate(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
+  @Test(expected = CrunchRuntimeException.class)
+  public void testMaterializeFailure() throws IOException {
+    runMaterializeWithFailure(
+            new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()));
   }
 
   @Test
-  public void testMaterializeEmptyIntermediate_InMemoryAvro() throws IOException {
-    runMaterializeEmptyIntermediate(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
+  public void testMaterializeNoFailure() throws IOException {
+    Configuration conf = tmpDir.getDefaultConfiguration();
+    conf.setBoolean("crunch.empty.materialize.on.failure", true);
+    runMaterializeWithFailure(new MRPipeline(MaterializeIT.class, conf));
   }
 
   public void runMaterializeInput(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
@@ -102,14 +102,38 @@ public class MaterializeIT {
     pipeline.done();
   }
 
-  public void runMaterializeEmptyIntermediate(Pipeline pipeline, PTypeFamily typeFamily)
+  public void runMaterializeEmptyIntermediate(Pipeline pipeline)
       throws IOException {
     String inputPath = tmpDir.copyResourceFileName("set1.txt");
-    PCollection<String> empty = pipeline.readTextFile(inputPath).filter(FilterFns.<String>REJECT_ALL());
+    PCollection<String> empty = pipeline.readTextFile(inputPath).filter(new FilterAll<String>(false));
     assertTrue(Iterables.isEmpty(empty.materialize()));
     pipeline.done();
   }
 
+  public void runMaterializeWithFailure(Pipeline pipeline) throws IOException {
+    String inputPath = tmpDir.copyResourceFileName("set1.txt");
+    PCollection<String> empty = pipeline.readTextFile(inputPath).filter(new FilterAll<String>(true));
+    empty.materialize().iterator();
+    pipeline.done();
+  }
+
+  static class FilterAll<T> extends FilterFn<T> {
+
+    private final boolean throwException;
+
+    public FilterAll(boolean throwException) {
+      this.throwException = throwException;
+    }
+
+    @Override
+    public boolean accept(T input) {
+      if (throwException) {
+        throw new RuntimeException("This is an exception");
+      }
+      return false;
+    }
+  }
+
   static class StringToStringWrapperPersonPairMapFn extends MapFn<String, Pair<StringWrapper, Person>> {
 
     @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/4f2b1f25/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
index f83117f..232d0a1 100644
--- a/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
@@ -92,7 +92,7 @@ public class MaterializableIterable<E> implements Iterable<E> {
   public Iterator<E> iterator() {
     if (materialized == null) {
       this.result = pipeline.run();
-      if (result.succeeded()) {
+      if (result.succeeded() || !pipeline.getConfiguration().getBoolean("crunch.empty.materialize.on.failure", false)) {
         materialize();
       } else {
         LOG.error("Pipeline run failed, returning empty iterator");