You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/05/08 23:52:43 UTC

crunch git commit: CRUNCH-509: Add support for 'named' targets in Crunch-on-Spark

Repository: crunch
Updated Branches:
  refs/heads/master 9f4193163 -> 6e3560651


CRUNCH-509: Add support for 'named' targets in Crunch-on-Spark


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/6e356065
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/6e356065
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/6e356065

Branch: refs/heads/master
Commit: 6e3560651ed3a0bd47cab2ad0ec1c808e599e050
Parents: 9f41931
Author: Josh Wills <jw...@apache.org>
Authored: Tue May 5 23:54:29 2015 +0100
Committer: Josh Wills <jw...@apache.org>
Committed: Fri May 8 22:43:02 2015 +0100

----------------------------------------------------------------------
 .../org/apache/crunch/io/avro/AvroFileTarget.java   |  8 +-------
 .../apache/crunch/io/avro/AvroPathPerKeyTarget.java |  8 +-------
 .../apache/crunch/types/avro/AvroOutputFormat.java  |  9 +--------
 .../org/apache/crunch/impl/spark/SparkRuntime.java  | 16 ++++++++++++++--
 4 files changed, 17 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/6e356065/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
index ed8d7b7..37aa05b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
@@ -76,16 +76,10 @@ public class AvroFileTarget extends FileTargetImpl {
   public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
     AvroType<?> atype = (AvroType<?>) ptype;
     FormatBundle bundle = FormatBundle.forOutput(AvroOutputFormat.class);
-    String schemaParam = null;
-    if (name == null) {
-      schemaParam = "avro.output.schema";
-    } else {
-      schemaParam = "avro.output.schema." + name;
-    }
     for (Map.Entry<String, String> e : extraConf.entrySet()) {
       bundle.set(e.getKey(), e.getValue());
     }
-    bundle.set(schemaParam, atype.getSchema().toString());
+    bundle.set("avro.output.schema", atype.getSchema().toString());
     AvroMode.fromType(atype).configure(bundle);
     configureForMapReduce(job, AvroWrapper.class, NullWritable.class, bundle,
         outputPath, name);

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e356065/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
index 76a2cf7..336b940 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
@@ -78,13 +78,7 @@ public class AvroPathPerKeyTarget extends FileTargetImpl {
   public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
     AvroType<?> atype = (AvroType) ((PTableType) ptype).getValueType();
     FormatBundle bundle = FormatBundle.forOutput(AvroPathPerKeyOutputFormat.class);
-    String schemaParam;
-    if (name == null) {
-      schemaParam = "avro.output.schema";
-    } else {
-      schemaParam = "avro.output.schema." + name;
-    }
-    bundle.set(schemaParam, atype.getSchema().toString());
+    bundle.set("avro.output.schema", atype.getSchema().toString());
     AvroMode.fromType(atype).configure(bundle);
     configureForMapReduce(job, AvroWrapper.class, NullWritable.class, bundle, outputPath, name);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e356065/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
index 79736b8..30a7399 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
@@ -37,14 +37,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
 
   public static <S>  DataFileWriter<S> getDataFileWriter(Path path, Configuration conf) throws IOException {
-    Schema schema = null;
-    String outputName = conf.get("crunch.namedoutput");
-    if (outputName != null && !outputName.isEmpty()) {
-      schema = (new Schema.Parser()).parse(conf.get("avro.output.schema." + outputName));
-    } else {
-      schema = AvroJob.getOutputSchema(conf);
-    }
-
+    Schema schema = AvroJob.getOutputSchema(conf);
     DataFileWriter<S> writer = new DataFileWriter<S>(AvroMode.fromConfiguration(conf).<S>getWriter(schema));
 
     JobConf jc = new JobConf(conf);

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e356065/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
index 3b5b419..4c0cb27 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
@@ -38,6 +38,7 @@ import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 import org.apache.crunch.impl.spark.fn.MapFunction;
 import org.apache.crunch.impl.spark.fn.OutputConverterFunction;
 import org.apache.crunch.impl.spark.fn.PairMapFunction;
+import org.apache.crunch.io.CrunchOutputs;
 import org.apache.crunch.io.MapReduceTarget;
 import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.materialize.MaterializableIterable;
@@ -323,7 +324,13 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
               Job job = new Job(conf);
               if (t instanceof PathTarget) {
                 PathTarget pt = (PathTarget) t;
-                pt.configureForMapReduce(job, ptype, pt.getPath(), null);
+                pt.configureForMapReduce(job, ptype, pt.getPath(), "out0");
+                CrunchOutputs.OutputConfig outConfig =
+                        CrunchOutputs.getNamedOutputs(job.getConfiguration()).get("out0");
+                job.setOutputFormatClass(outConfig.bundle.getFormatClass());
+                job.setOutputKeyClass(outConfig.keyClass);
+                job.setOutputValueClass(outConfig.valueClass);
+                outConfig.bundle.configure(job.getConfiguration());
                 Path tmpPath = pipeline.createTempPath();
                 outRDD.saveAsNewAPIHadoopFile(
                     tmpPath.toString(),
@@ -334,7 +341,12 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
                 pt.handleOutputs(job.getConfiguration(), tmpPath, -1);
               } else if (t instanceof MapReduceTarget) {
                 MapReduceTarget mrt = (MapReduceTarget) t;
-                mrt.configureForMapReduce(job, ptype, new Path("/tmp"), null);
+                mrt.configureForMapReduce(job, ptype, new Path("/tmp"), "out0");
+                CrunchOutputs.OutputConfig outConfig =
+                        CrunchOutputs.getNamedOutputs(job.getConfiguration()).get("out0");
+                job.setOutputFormatClass(outConfig.bundle.getFormatClass());
+                job.setOutputKeyClass(outConfig.keyClass);
+                job.setOutputValueClass(outConfig.valueClass);
                 outRDD.saveAsHadoopDataset(new JobConf(job.getConfiguration()));
               } else {
                 throw new IllegalArgumentException("Spark execution cannot handle non-MapReduceTarget: " + t);