You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/01/29 18:51:26 UTC
crunch git commit: CRUNCH-493: Re-enable exception throwing on
reading materialized PCollections after a pipeline failure
Repository: crunch
Updated Branches:
refs/heads/master cbb1b7e75 -> 4f2b1f25f
CRUNCH-493: Re-enable exception throwing on reading materialized PCollections after a pipeline failure
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/4f2b1f25
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/4f2b1f25
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/4f2b1f25
Branch: refs/heads/master
Commit: 4f2b1f25fe21aca7c2df4747e754862d113173be
Parents: cbb1b7e
Author: Josh Wills <jw...@apache.org>
Authored: Wed Jan 28 12:41:03 2015 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Jan 29 09:15:36 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/crunch/MaterializeIT.java | 54 ++++++++++++++------
.../materialize/MaterializableIterable.java | 2 +-
2 files changed, 40 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/4f2b1f25/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
index cb0f306..7bc61df 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.util.List;
import com.google.common.collect.Iterables;
-import org.apache.crunch.fn.FilterFns;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.materialize.MaterializableIterable;
@@ -36,6 +35,7 @@ import org.apache.crunch.types.PTypeFamily;
import org.apache.crunch.types.avro.AvroTypeFamily;
import org.apache.crunch.types.avro.Avros;
import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
@@ -70,27 +70,27 @@ public class MaterializeIT {
}
@Test
- public void testMaterializeEmptyIntermediate_Writables() throws IOException {
+ public void testMaterializeEmptyIntermediate() throws IOException {
runMaterializeEmptyIntermediate(
- new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()),
- WritableTypeFamily.getInstance());
+ new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()));
}
@Test
- public void testMaterializeEmptyIntermediate_Avro() throws IOException {
- runMaterializeEmptyIntermediate(
- new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()),
- AvroTypeFamily.getInstance());
+ public void testMaterializeEmptyIntermediate_InMemory() throws IOException {
+ runMaterializeEmptyIntermediate(MemPipeline.getInstance());
}
- @Test
- public void testMaterializeEmptyIntermediate_InMemoryWritables() throws IOException {
- runMaterializeEmptyIntermediate(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
+ @Test(expected = CrunchRuntimeException.class)
+ public void testMaterializeFailure() throws IOException {
+ runMaterializeWithFailure(
+ new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()));
}
@Test
- public void testMaterializeEmptyIntermediate_InMemoryAvro() throws IOException {
- runMaterializeEmptyIntermediate(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
+ public void testMaterializeNoFailure() throws IOException {
+ Configuration conf = tmpDir.getDefaultConfiguration();
+ conf.setBoolean("crunch.empty.materialize.on.failure", true);
+ runMaterializeWithFailure(new MRPipeline(MaterializeIT.class, conf));
}
public void runMaterializeInput(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
@@ -102,14 +102,38 @@ public class MaterializeIT {
pipeline.done();
}
- public void runMaterializeEmptyIntermediate(Pipeline pipeline, PTypeFamily typeFamily)
+ public void runMaterializeEmptyIntermediate(Pipeline pipeline)
throws IOException {
String inputPath = tmpDir.copyResourceFileName("set1.txt");
- PCollection<String> empty = pipeline.readTextFile(inputPath).filter(FilterFns.<String>REJECT_ALL());
+ PCollection<String> empty = pipeline.readTextFile(inputPath).filter(new FilterAll<String>(false));
assertTrue(Iterables.isEmpty(empty.materialize()));
pipeline.done();
}
+ public void runMaterializeWithFailure(Pipeline pipeline) throws IOException {
+ String inputPath = tmpDir.copyResourceFileName("set1.txt");
+ PCollection<String> empty = pipeline.readTextFile(inputPath).filter(new FilterAll<String>(true));
+ empty.materialize().iterator();
+ pipeline.done();
+ }
+
+ static class FilterAll<T> extends FilterFn<T> {
+
+ private final boolean throwException;
+
+ public FilterAll(boolean throwException) {
+ this.throwException = throwException;
+ }
+
+ @Override
+ public boolean accept(T input) {
+ if (throwException) {
+ throw new RuntimeException("This is an exception");
+ }
+ return false;
+ }
+ }
+
static class StringToStringWrapperPersonPairMapFn extends MapFn<String, Pair<StringWrapper, Person>> {
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/4f2b1f25/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
index f83117f..232d0a1 100644
--- a/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
@@ -92,7 +92,7 @@ public class MaterializableIterable<E> implements Iterable<E> {
public Iterator<E> iterator() {
if (materialized == null) {
this.result = pipeline.run();
- if (result.succeeded()) {
+ if (result.succeeded() || !pipeline.getConfiguration().getBoolean("crunch.empty.materialize.on.failure", false)) {
materialize();
} else {
LOG.error("Pipeline run failed, returning empty iterator");