You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yi...@apache.org on 2022/06/14 17:27:20 UTC
[beam] branch master updated: [BEAM-14553] Add destination coder to FileResultCoder components (#17818)
This is an automated email from the ASF dual-hosted git repository.
yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ecea6de6dc8 [BEAM-14553] Add destination coder to FileResultCoder components (#17818)
ecea6de6dc8 is described below
commit ecea6de6dc8b7d2c682dcaaf860a0ecfe666d380
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Tue Jun 14 10:27:13 2022 -0700
[BEAM-14553] Add destination coder to FileResultCoder components (#17818)
---
.../core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java | 7 ++++++-
.../java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java | 6 ++++--
2 files changed, 10 insertions(+), 3 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 1f8e89caf61..b0349236131 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -1196,7 +1196,12 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
@Override
public List<? extends Coder<?>> getCoderArguments() {
- return Arrays.asList(windowCoder);
+ return Collections.singletonList(destinationCoder);
+ }
+
+ @Override
+ public List<? extends Coder<?>> getComponents() {
+ return Arrays.asList(windowCoder, destinationCoder);
}
@Override
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index ecc2a2c93c3..4019474c0e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -523,7 +523,8 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
public void process(ProcessContext c) {
c.output(c.element().withShard(UNKNOWN_SHARDNUM));
}
- }));
+ }))
+ .setCoder(fileResultCoder);
return PCollectionList.of(writtenBundleFiles)
.and(writtenSpilledFiles)
.apply(Flatten.pCollections())
@@ -807,7 +808,8 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
public void process(ProcessContext c) {
c.output(c.element().withShard(UNKNOWN_SHARDNUM));
}
- }));
+ }))
+ .setCoder(fileResultCoder);
// Group temp file results by destinations again to gather all the results in the same window.
// This is needed since we don't have shard idx associated with each temp file so have to rely