You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/01/26 02:24:32 UTC

git commit: CRUNCH-144: Convert Targets to SourceTargets wherever possible to mark a PCollection as materialized, and update the rules on converting a text file target to SourceTargets.

Updated Branches:
  refs/heads/master 5cf8142ae -> 9a1c42760


CRUNCH-144: Convert Targets to SourceTargets wherever possible to mark a PCollection as materialized,
and update the rules on converting a text file target to SourceTargets.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/9a1c4276
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/9a1c4276
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/9a1c4276

Branch: refs/heads/master
Commit: 9a1c42760b5c01a5d1dbdf6ee6bdc4f9fb8ca086
Parents: 5cf8142
Author: Josh Wills <jw...@apache.org>
Authored: Wed Jan 16 07:42:10 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Jan 25 15:47:17 2013 -0800

----------------------------------------------------------------------
 .../java/org/apache/crunch/impl/mr/MRPipeline.java |   14 ++++++--
 .../org/apache/crunch/io/text/TextFileTarget.java  |   29 +++++++++++++++
 2 files changed, 40 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/9a1c4276/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index d9545f8..c71ef23 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -162,9 +162,17 @@ public class MRPipeline implements Pipeline {
       } else {
         boolean materialized = false;
         for (Target t : outputTargets.get(c)) {
-          if (!materialized && t instanceof Source) {
-            c.materializeAt((SourceTarget) t);
-            materialized = true;
+          if (!materialized) {
+            if (t instanceof SourceTarget) {
+              c.materializeAt((SourceTarget) t);
+              materialized = true;
+            } else {
+              SourceTarget st = t.asSourceTarget(c.getPType());
+              if (st != null) {
+                c.materializeAt(st);
+                materialized = true;
+              }
+            }
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/9a1c4276/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
index ec7d521..0c3e6a4 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.io.text;
 
+import org.apache.avro.Schema;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.SequentialFileNamingScheme;
@@ -25,8 +26,12 @@ import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.avro.AvroTextOutputFormat;
+import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableType;
+import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
@@ -72,9 +77,33 @@ public class TextFileTarget extends FileTargetImpl {
 
   @Override
   public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+    if (!isTextCompatible(ptype)) {
+      return null;
+    }
     if (ptype instanceof PTableType) {
       return new TextFileTableSourceTarget(path, (PTableType) ptype);
     }
     return new TextFileSourceTarget<T>(path, ptype);
   }
+  
+  private <T> boolean isTextCompatible(PType<T> ptype) {
+    if (AvroTypeFamily.getInstance().equals(ptype.getFamily())) {
+      AvroType<T> at = (AvroType<T>) ptype;
+      if (at.getSchema().equals(Schema.create(Schema.Type.STRING))) {
+        return true;
+      }
+    } else if (WritableTypeFamily.getInstance().equals(ptype.getFamily())) {
+      if (ptype instanceof PTableType) {
+        PTableType ptt = (PTableType) ptype;
+        return isText(ptt.getKeyType()) && isText(ptt.getValueType());
+      } else {
+        return isText(ptype);
+      }
+    }
+    return false;
+  }
+  
+  private <T> boolean isText(PType<T> wtype) {
+    return Text.class.equals(((WritableType) wtype).getSerializationClass());
+  }
 }