You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2019/09/23 18:31:28 UTC
[crunch] branch master updated: CRUNCH-670: Make
AvroPathPerKeyTarget work with the Spark Runtime.
This is an automated email from the ASF dual-hosted git repository.
jwills pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/crunch.git
The following commit(s) were added to refs/heads/master by this push:
new b58363b CRUNCH-670: Make AvroPathPerKeyTarget work with the Spark Runtime.
b58363b is described below
commit b58363bef9916be59ad0130c5c0abf210a4addfb
Author: Josh Wills <jw...@apache.org>
AuthorDate: Mon Sep 23 10:57:52 2019 -0700
CRUNCH-670: Make AvroPathPerKeyTarget work with the Spark Runtime.
---
.../main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java | 5 +++++
.../apache/crunch/io/parquet/AvroParquetPathPerKeyOutputFormat.java | 2 +-
.../org/apache/crunch/types/avro/AvroPathPerKeyOutputFormat.java | 2 +-
3 files changed, 7 insertions(+), 2 deletions(-)
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
index a3ecbb8..9f76b45 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
@@ -92,6 +92,11 @@ public class AvroPathPerKeyTarget extends FileTargetImpl {
@Override
public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
FileSystem srcFs = workingPath.getFileSystem(conf);
+ if (index == -1) {
+ // Map the -1 index from the SparkRuntime to the (correct) out0 value that
+ // the AvroPathPerKeyTarget expects.
+ index = 0;
+ }
Path base = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index);
if (!srcFs.exists(base)) {
LOG.warn("Nothing to copy from {}", base);
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyOutputFormat.java
index 852483d..087e935 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyOutputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyOutputFormat.java
@@ -42,7 +42,7 @@ public class AvroParquetPathPerKeyOutputFormat<T> extends FileOutputFormat<AvroW
public RecordWriter<AvroWrapper<Pair<Utf8, T>>, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
Configuration conf = taskAttemptContext.getConfiguration();
- Path basePath = new Path(getOutputPath(taskAttemptContext), conf.get("mapreduce.output.basename", "part"));
+ Path basePath = new Path(getOutputPath(taskAttemptContext), conf.get("mapreduce.output.basename", "out0"));
return new AvroParquetFilePerKeyRecordWriter<T>(basePath,
getUniqueFile(taskAttemptContext, "part", ".parquet"), conf);
}
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPathPerKeyOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPathPerKeyOutputFormat.java
index da84fc0..6563707 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPathPerKeyOutputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPathPerKeyOutputFormat.java
@@ -44,7 +44,7 @@ public class AvroPathPerKeyOutputFormat<T> extends FileOutputFormat<AvroWrapper<
public RecordWriter<AvroWrapper<Pair<Utf8, T>>, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
Configuration conf = taskAttemptContext.getConfiguration();
- Path basePath = new Path(getOutputPath(taskAttemptContext), conf.get("mapreduce.output.basename", "part"));
+ Path basePath = new Path(getOutputPath(taskAttemptContext), conf.get("mapreduce.output.basename", "out0"));
return new AvroFilePerKeyRecordWriter<T>(basePath, getUniqueFile(taskAttemptContext, "part", ".avro"), conf);
}