You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/09/09 17:58:25 UTC

[beam] branch master updated: [BEAM-10863] Change encoding of Pubsub sink to global window.

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b6d0abb  [BEAM-10863] Change encoding of Pubsub sink to global window.
     new 8bae9b3  Merge pull request #12791 from [BEAM-10863] Change encoding of Pubsub sink to global window.
b6d0abb is described below

commit b6d0abbe1fa2d7d19f62419a528feeee9558e9ff
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Tue Sep 8 17:51:56 2020 -0700

    [BEAM-10863] Change encoding of Pubsub sink to global window.
---
 .../main/java/org/apache/beam/runners/dataflow/DataflowRunner.java | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index e64e20f..4b1af69 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1460,9 +1460,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       stepContext.addInput(
           PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN,
           byteArrayToJsonString(serializeToByteArray(new IdentityMessageFn())));
-      // No coder is needed in this case since the collection being written is already of
-      // PubsubMessage, however the Dataflow backend require a coder to be set.
-      stepContext.addEncodingInput(WindowedValue.getValueOnlyCoder(VoidCoder.of()));
+
+      // Using a GlobalWindowCoder as a place holder because GlobalWindowCoder is known coder.
+      stepContext.addEncodingInput(
+          WindowedValue.getFullCoder(VoidCoder.of(), GlobalWindow.Coder.INSTANCE));
       stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
     }
   }