You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/05/08 23:52:43 UTC
crunch git commit: CRUNCH-509: Add support for 'named' targets in
Crunch-on-Spark
Repository: crunch
Updated Branches:
refs/heads/master 9f4193163 -> 6e3560651
CRUNCH-509: Add support for 'named' targets in Crunch-on-Spark
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/6e356065
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/6e356065
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/6e356065
Branch: refs/heads/master
Commit: 6e3560651ed3a0bd47cab2ad0ec1c808e599e050
Parents: 9f41931
Author: Josh Wills <jw...@apache.org>
Authored: Tue May 5 23:54:29 2015 +0100
Committer: Josh Wills <jw...@apache.org>
Committed: Fri May 8 22:43:02 2015 +0100
----------------------------------------------------------------------
.../org/apache/crunch/io/avro/AvroFileTarget.java | 8 +-------
.../apache/crunch/io/avro/AvroPathPerKeyTarget.java | 8 +-------
.../apache/crunch/types/avro/AvroOutputFormat.java | 9 +--------
.../org/apache/crunch/impl/spark/SparkRuntime.java | 16 ++++++++++++++--
4 files changed, 17 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e356065/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
index ed8d7b7..37aa05b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
@@ -76,16 +76,10 @@ public class AvroFileTarget extends FileTargetImpl {
public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
AvroType<?> atype = (AvroType<?>) ptype;
FormatBundle bundle = FormatBundle.forOutput(AvroOutputFormat.class);
- String schemaParam = null;
- if (name == null) {
- schemaParam = "avro.output.schema";
- } else {
- schemaParam = "avro.output.schema." + name;
- }
for (Map.Entry<String, String> e : extraConf.entrySet()) {
bundle.set(e.getKey(), e.getValue());
}
- bundle.set(schemaParam, atype.getSchema().toString());
+ bundle.set("avro.output.schema", atype.getSchema().toString());
AvroMode.fromType(atype).configure(bundle);
configureForMapReduce(job, AvroWrapper.class, NullWritable.class, bundle,
outputPath, name);
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e356065/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
index 76a2cf7..336b940 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
@@ -78,13 +78,7 @@ public class AvroPathPerKeyTarget extends FileTargetImpl {
public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
AvroType<?> atype = (AvroType) ((PTableType) ptype).getValueType();
FormatBundle bundle = FormatBundle.forOutput(AvroPathPerKeyOutputFormat.class);
- String schemaParam;
- if (name == null) {
- schemaParam = "avro.output.schema";
- } else {
- schemaParam = "avro.output.schema." + name;
- }
- bundle.set(schemaParam, atype.getSchema().toString());
+ bundle.set("avro.output.schema", atype.getSchema().toString());
AvroMode.fromType(atype).configure(bundle);
configureForMapReduce(job, AvroWrapper.class, NullWritable.class, bundle, outputPath, name);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e356065/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
index 79736b8..30a7399 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
@@ -37,14 +37,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
public static <S> DataFileWriter<S> getDataFileWriter(Path path, Configuration conf) throws IOException {
- Schema schema = null;
- String outputName = conf.get("crunch.namedoutput");
- if (outputName != null && !outputName.isEmpty()) {
- schema = (new Schema.Parser()).parse(conf.get("avro.output.schema." + outputName));
- } else {
- schema = AvroJob.getOutputSchema(conf);
- }
-
+ Schema schema = AvroJob.getOutputSchema(conf);
DataFileWriter<S> writer = new DataFileWriter<S>(AvroMode.fromConfiguration(conf).<S>getWriter(schema));
JobConf jc = new JobConf(conf);
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e356065/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
index 3b5b419..4c0cb27 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
@@ -38,6 +38,7 @@ import org.apache.crunch.impl.dist.collect.PCollectionImpl;
import org.apache.crunch.impl.spark.fn.MapFunction;
import org.apache.crunch.impl.spark.fn.OutputConverterFunction;
import org.apache.crunch.impl.spark.fn.PairMapFunction;
+import org.apache.crunch.io.CrunchOutputs;
import org.apache.crunch.io.MapReduceTarget;
import org.apache.crunch.io.PathTarget;
import org.apache.crunch.materialize.MaterializableIterable;
@@ -323,7 +324,13 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
Job job = new Job(conf);
if (t instanceof PathTarget) {
PathTarget pt = (PathTarget) t;
- pt.configureForMapReduce(job, ptype, pt.getPath(), null);
+ pt.configureForMapReduce(job, ptype, pt.getPath(), "out0");
+ CrunchOutputs.OutputConfig outConfig =
+ CrunchOutputs.getNamedOutputs(job.getConfiguration()).get("out0");
+ job.setOutputFormatClass(outConfig.bundle.getFormatClass());
+ job.setOutputKeyClass(outConfig.keyClass);
+ job.setOutputValueClass(outConfig.valueClass);
+ outConfig.bundle.configure(job.getConfiguration());
Path tmpPath = pipeline.createTempPath();
outRDD.saveAsNewAPIHadoopFile(
tmpPath.toString(),
@@ -334,7 +341,12 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
pt.handleOutputs(job.getConfiguration(), tmpPath, -1);
} else if (t instanceof MapReduceTarget) {
MapReduceTarget mrt = (MapReduceTarget) t;
- mrt.configureForMapReduce(job, ptype, new Path("/tmp"), null);
+ mrt.configureForMapReduce(job, ptype, new Path("/tmp"), "out0");
+ CrunchOutputs.OutputConfig outConfig =
+ CrunchOutputs.getNamedOutputs(job.getConfiguration()).get("out0");
+ job.setOutputFormatClass(outConfig.bundle.getFormatClass());
+ job.setOutputKeyClass(outConfig.keyClass);
+ job.setOutputValueClass(outConfig.valueClass);
outRDD.saveAsHadoopDataset(new JobConf(job.getConfiguration()));
} else {
throw new IllegalArgumentException("Spark execution cannot handle non-MapReduceTarget: " + t);