You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/09/09 17:58:25 UTC
[beam] branch master updated: [BEAM-10863] Change encoding of
Pubsub sink to global window.
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b6d0abb [BEAM-10863] Change encoding of Pubsub sink to global window.
new 8bae9b3 Merge pull request #12791 from [BEAM-10863] Change encoding of Pubsub sink to global window.
b6d0abb is described below
commit b6d0abbe1fa2d7d19f62419a528feeee9558e9ff
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Tue Sep 8 17:51:56 2020 -0700
[BEAM-10863] Change encoding of Pubsub sink to global window.
---
.../main/java/org/apache/beam/runners/dataflow/DataflowRunner.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index e64e20f..4b1af69 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1460,9 +1460,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
stepContext.addInput(
PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN,
byteArrayToJsonString(serializeToByteArray(new IdentityMessageFn())));
- // No coder is needed in this case since the collection being written is already of
- // PubsubMessage, however the Dataflow backend require a coder to be set.
- stepContext.addEncodingInput(WindowedValue.getValueOnlyCoder(VoidCoder.of()));
+
+ // Using a GlobalWindowCoder as a place holder because GlobalWindowCoder is known coder.
+ stepContext.addEncodingInput(
+ WindowedValue.getFullCoder(VoidCoder.of(), GlobalWindow.Coder.INSTANCE));
stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
}
}