You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2019/09/23 18:31:28 UTC

[crunch] branch master updated: CRUNCH-670: Make AvroPathPerKeyTarget work with the Spark Runtime.

This is an automated email from the ASF dual-hosted git repository.

jwills pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/crunch.git


The following commit(s) were added to refs/heads/master by this push:
     new b58363b  CRUNCH-670: Make AvroPathPerKeyTarget work with the Spark Runtime.
b58363b is described below

commit b58363bef9916be59ad0130c5c0abf210a4addfb
Author: Josh Wills <jw...@apache.org>
AuthorDate: Mon Sep 23 10:57:52 2019 -0700

    CRUNCH-670: Make AvroPathPerKeyTarget work with the Spark Runtime.
---
 .../main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java    | 5 +++++
 .../apache/crunch/io/parquet/AvroParquetPathPerKeyOutputFormat.java  | 2 +-
 .../org/apache/crunch/types/avro/AvroPathPerKeyOutputFormat.java     | 2 +-
 3 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
index a3ecbb8..9f76b45 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
@@ -92,6 +92,11 @@ public class AvroPathPerKeyTarget extends FileTargetImpl {
   @Override
   public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
     FileSystem srcFs = workingPath.getFileSystem(conf);
+    if (index == -1) {
+      // Map the -1 index from the SparkRuntime to the (correct) out0 value that
+      // the AvroPathPerKeyTarget expects.
+      index = 0;
+    }
     Path base = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index);
     if (!srcFs.exists(base)) {
       LOG.warn("Nothing to copy from {}", base);
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyOutputFormat.java
index 852483d..087e935 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyOutputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyOutputFormat.java
@@ -42,7 +42,7 @@ public class AvroParquetPathPerKeyOutputFormat<T> extends FileOutputFormat<AvroW
   public RecordWriter<AvroWrapper<Pair<Utf8, T>>, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext)
       throws IOException, InterruptedException {
     Configuration conf = taskAttemptContext.getConfiguration();
-    Path basePath = new Path(getOutputPath(taskAttemptContext), conf.get("mapreduce.output.basename", "part"));
+    Path basePath = new Path(getOutputPath(taskAttemptContext), conf.get("mapreduce.output.basename", "out0"));
     return new AvroParquetFilePerKeyRecordWriter<T>(basePath,
         getUniqueFile(taskAttemptContext, "part", ".parquet"), conf);
   }
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPathPerKeyOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPathPerKeyOutputFormat.java
index da84fc0..6563707 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPathPerKeyOutputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPathPerKeyOutputFormat.java
@@ -44,7 +44,7 @@ public class AvroPathPerKeyOutputFormat<T> extends FileOutputFormat<AvroWrapper<
   public RecordWriter<AvroWrapper<Pair<Utf8, T>>, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext)
       throws IOException, InterruptedException {
     Configuration conf = taskAttemptContext.getConfiguration();
-    Path basePath = new Path(getOutputPath(taskAttemptContext), conf.get("mapreduce.output.basename", "part"));
+    Path basePath = new Path(getOutputPath(taskAttemptContext), conf.get("mapreduce.output.basename", "out0"));
     return new AvroFilePerKeyRecordWriter<T>(basePath, getUniqueFile(taskAttemptContext, "part", ".avro"), conf);
   }