You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yi...@apache.org on 2022/06/14 17:27:20 UTC

[beam] branch master updated: [BEAM-14553] Add destination coder to FileResultCoder components (#17818)

This is an automated email from the ASF dual-hosted git repository.

yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ecea6de6dc8 [BEAM-14553] Add destination coder to FileResultCoder components (#17818)
ecea6de6dc8 is described below

commit ecea6de6dc8b7d2c682dcaaf860a0ecfe666d380
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Tue Jun 14 10:27:13 2022 -0700

    [BEAM-14553] Add destination coder to FileResultCoder components (#17818)
---
 .../core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java   | 7 ++++++-
 .../java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java | 6 ++++--
 2 files changed, 10 insertions(+), 3 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 1f8e89caf61..b0349236131 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -1196,7 +1196,12 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
 
     @Override
     public List<? extends Coder<?>> getCoderArguments() {
-      return Arrays.asList(windowCoder);
+      return Collections.singletonList(destinationCoder);
+    }
+
+    @Override
+    public List<? extends Coder<?>> getComponents() {
+      return Arrays.asList(windowCoder, destinationCoder);
     }
 
     @Override
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index ecc2a2c93c3..4019474c0e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -523,7 +523,8 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
                         public void process(ProcessContext c) {
                           c.output(c.element().withShard(UNKNOWN_SHARDNUM));
                         }
-                      }));
+                      }))
+              .setCoder(fileResultCoder);
       return PCollectionList.of(writtenBundleFiles)
           .and(writtenSpilledFiles)
           .apply(Flatten.pCollections())
@@ -807,7 +808,8 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
                         public void process(ProcessContext c) {
                           c.output(c.element().withShard(UNKNOWN_SHARDNUM));
                         }
-                      }));
+                      }))
+              .setCoder(fileResultCoder);
 
       // Group temp file results by destinations again to gather all the results in the same window.
       // This is needed since we don't have shard idx associated with each temp file so have to rely