You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/09/13 18:34:52 UTC

git commit: CRUNCH-54: Fix bug where we throw NPE when materializing a PCollection that has been written before

Updated Branches:
  refs/heads/master 7e8d63a20 -> c38a32bdf


CRUNCH-54: Fix bug where we throw NPE when materializing a PCollection that has been written before


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/c38a32bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/c38a32bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/c38a32bd

Branch: refs/heads/master
Commit: c38a32bdf600b56088345326f0ecc30b51d73778
Parents: 7e8d63a
Author: Josh Wills <jw...@apache.org>
Authored: Thu Sep 13 09:33:50 2012 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Sep 13 09:33:50 2012 -0700

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/MRPipelineIT.java    |   55 +++++++++++++++
 .../crunch/impl/mr/collect/PCollectionImpl.java    |    7 ++-
 2 files changed, 60 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/c38a32bd/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
new file mode 100644
index 0000000..e4ff91d
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.io.File;
+import java.io.Serializable;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class MRPipelineIT implements Serializable {
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void materializedColShouldBeWritten() throws Exception {
+    File textFile = tmpDir.copyResourceFile("shakes.txt");
+    Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<String> genericCollection = pipeline.readTextFile(textFile.getAbsolutePath());
+    pipeline.run();
+    PCollection<String> filter = genericCollection.filter("Filtering data", new FilterFn<String>() {
+      @Override
+      public boolean accept(String input) {
+        return true;
+      }
+    });
+    filter.materialize();
+    pipeline.run();
+    File file = tmpDir.getFile("output.txt");
+    Target outFile = To.textFile(file.getAbsolutePath());
+    PCollection<String> write = filter.write(outFile);
+    write.materialize();
+    pipeline.run();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/c38a32bd/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index d88da42..486b976 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -100,9 +100,12 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
     return new DoTableImpl<K, V>(name, this, fn, type);
   }
 
-  @Override
   public PCollection<S> write(Target target) {
-    getPipeline().write(this, target);
+    if (materializedAt != null) {
+      getPipeline().write(new InputCollection<S>(materializedAt, (MRPipeline) getPipeline()), target);
+    } else {
+      getPipeline().write(this, target);
+    }
     return this;
   }