You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/09/13 18:34:52 UTC
git commit: CRUNCH-54: Fix bug where we throw NPE when materializing
a PCollection that has been written before
Updated Branches:
refs/heads/master 7e8d63a20 -> c38a32bdf
CRUNCH-54: Fix bug where we throw NPE when materializing a PCollection that has been written before
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/c38a32bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/c38a32bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/c38a32bd
Branch: refs/heads/master
Commit: c38a32bdf600b56088345326f0ecc30b51d73778
Parents: 7e8d63a
Author: Josh Wills <jw...@apache.org>
Authored: Thu Sep 13 09:33:50 2012 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Sep 13 09:33:50 2012 -0700
----------------------------------------------------------------------
.../it/java/org/apache/crunch/MRPipelineIT.java | 55 +++++++++++++++
.../crunch/impl/mr/collect/PCollectionImpl.java | 7 ++-
2 files changed, 60 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/c38a32bd/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
new file mode 100644
index 0000000..e4ff91d
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.io.File;
+import java.io.Serializable;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class MRPipelineIT implements Serializable {
+ @Rule
+ public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+ @Test
+ public void materializedColShouldBeWritten() throws Exception {
+ File textFile = tmpDir.copyResourceFile("shakes.txt");
+ Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<String> genericCollection = pipeline.readTextFile(textFile.getAbsolutePath());
+ pipeline.run();
+ PCollection<String> filter = genericCollection.filter("Filtering data", new FilterFn<String>() {
+ @Override
+ public boolean accept(String input) {
+ return true;
+ }
+ });
+ filter.materialize();
+ pipeline.run();
+ File file = tmpDir.getFile("output.txt");
+ Target outFile = To.textFile(file.getAbsolutePath());
+ PCollection<String> write = filter.write(outFile);
+ write.materialize();
+ pipeline.run();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/c38a32bd/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index d88da42..486b976 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -100,9 +100,12 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
return new DoTableImpl<K, V>(name, this, fn, type);
}
- @Override
public PCollection<S> write(Target target) {
- getPipeline().write(this, target);
+ if (materializedAt != null) {
+ getPipeline().write(new InputCollection<S>(materializedAt, (MRPipeline) getPipeline()), target);
+ } else {
+ getPipeline().write(this, target);
+ }
return this;
}