You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/01/26 02:24:32 UTC
git commit: CRUNCH-144: Convert Targets to SourceTargets wherever
possible to mark a PCollection as materialized,
and update the rules on converting a text file target to SourceTargets.
Updated Branches:
refs/heads/master 5cf8142ae -> 9a1c42760
CRUNCH-144: Convert Targets to SourceTargets wherever possible to mark a PCollection as materialized,
and update the rules on converting a text file target to SourceTargets.
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/9a1c4276
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/9a1c4276
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/9a1c4276
Branch: refs/heads/master
Commit: 9a1c42760b5c01a5d1dbdf6ee6bdc4f9fb8ca086
Parents: 5cf8142
Author: Josh Wills <jw...@apache.org>
Authored: Wed Jan 16 07:42:10 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Jan 25 15:47:17 2013 -0800
----------------------------------------------------------------------
.../java/org/apache/crunch/impl/mr/MRPipeline.java | 14 ++++++--
.../org/apache/crunch/io/text/TextFileTarget.java | 29 +++++++++++++++
2 files changed, 40 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/9a1c4276/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index d9545f8..c71ef23 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -162,9 +162,17 @@ public class MRPipeline implements Pipeline {
} else {
boolean materialized = false;
for (Target t : outputTargets.get(c)) {
- if (!materialized && t instanceof Source) {
- c.materializeAt((SourceTarget) t);
- materialized = true;
+ if (!materialized) {
+ if (t instanceof SourceTarget) {
+ c.materializeAt((SourceTarget) t);
+ materialized = true;
+ } else {
+ SourceTarget st = t.asSourceTarget(c.getPType());
+ if (st != null) {
+ c.materializeAt(st);
+ materialized = true;
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/9a1c4276/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
index ec7d521..0c3e6a4 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
@@ -17,6 +17,7 @@
*/
package org.apache.crunch.io.text;
+import org.apache.avro.Schema;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.io.FileNamingScheme;
import org.apache.crunch.io.SequentialFileNamingScheme;
@@ -25,8 +26,12 @@ import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.avro.AvroTextOutputFormat;
+import org.apache.crunch.types.avro.AvroType;
import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableType;
+import org.apache.crunch.types.writable.WritableTypeFamily;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
@@ -72,9 +77,33 @@ public class TextFileTarget extends FileTargetImpl {
@Override
public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+ if (!isTextCompatible(ptype)) {
+ return null;
+ }
if (ptype instanceof PTableType) {
return new TextFileTableSourceTarget(path, (PTableType) ptype);
}
return new TextFileSourceTarget<T>(path, ptype);
}
+
+ private <T> boolean isTextCompatible(PType<T> ptype) {
+ if (AvroTypeFamily.getInstance().equals(ptype.getFamily())) {
+ AvroType<T> at = (AvroType<T>) ptype;
+ if (at.getSchema().equals(Schema.create(Schema.Type.STRING))) {
+ return true;
+ }
+ } else if (WritableTypeFamily.getInstance().equals(ptype.getFamily())) {
+ if (ptype instanceof PTableType) {
+ PTableType ptt = (PTableType) ptype;
+ return isText(ptt.getKeyType()) && isText(ptt.getValueType());
+ } else {
+ return isText(ptype);
+ }
+ }
+ return false;
+ }
+
+ private <T> boolean isText(PType<T> wtype) {
+ return Text.class.equals(((WritableType) wtype).getSerializationClass());
+ }
}