You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/06/17 20:27:49 UTC

[beam] branch master updated: [BEAM-3489] DataflowRunner to read PubsubMessage if getNeedsMessageId is true (#11873)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c0dd6e7  [BEAM-3489] DataflowRunner to read PubsubMessage if getNeedsMessageId is true (#11873)
c0dd6e7 is described below

commit c0dd6e72ebe5103f817d8c7ccaf02f1a67368aa5
Author: Thinh Ha <th...@gmail.com>
AuthorDate: Wed Jun 17 21:27:30 2020 +0100

    [BEAM-3489] DataflowRunner to read PubsubMessage if getNeedsMessageId is true (#11873)
    
    * DataflowRunner to read PubsubMessage if getNeedsMessageId is true
    
    * changed | to ||
    
    * spotless apply
---
 .../java/org/apache/beam/runners/dataflow/DataflowRunner.java  | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a6358c1..38d88e3 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1353,11 +1353,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
             PropertyNames.PUBSUB_ID_ATTRIBUTE, overriddenTransform.getIdAttribute());
       }
       // In both cases, the transform needs to read PubsubMessage. However, in case it needs
-      // the attributes, we supply an identity "parse fn" so the worker will read PubsubMessage's
-      // from Windmill and simply pass them around; and in case it doesn't need attributes,
-      // we're already implicitly using a "Coder" that interprets the data as a PubsubMessage's
-      // payload.
-      if (overriddenTransform.getNeedsAttributes()) {
+      // the attributes or messageId, we supply an identity "parse fn" so the worker will
+      // read PubsubMessage's from Windmill and simply pass them around; and in case it
+      // doesn't need attributes, we're already implicitly using a "Coder" that interprets
+      // the data as a PubsubMessage's payload.
+      if (overriddenTransform.getNeedsAttributes() || overriddenTransform.getNeedsMessageId()) {
         stepContext.addInput(
             PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN,
             byteArrayToJsonString(serializeToByteArray(new IdentityMessageFn())));