You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/06/17 20:27:49 UTC
[beam] branch master updated: [BEAM-3489] DataflowRunner to read
PubsubMessage if getNeedsMessageId is true (#11873)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c0dd6e7 [BEAM-3489] DataflowRunner to read PubsubMessage if getNeedsMessageId is true (#11873)
c0dd6e7 is described below
commit c0dd6e72ebe5103f817d8c7ccaf02f1a67368aa5
Author: Thinh Ha <th...@gmail.com>
AuthorDate: Wed Jun 17 21:27:30 2020 +0100
[BEAM-3489] DataflowRunner to read PubsubMessage if getNeedsMessageId is true (#11873)
* DataflowRunner to read PubsubMessage if getNeedsMessageId is true
* changed | to ||
* spotless apply
---
.../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a6358c1..38d88e3 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1353,11 +1353,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
PropertyNames.PUBSUB_ID_ATTRIBUTE, overriddenTransform.getIdAttribute());
}
// In both cases, the transform needs to read PubsubMessage. However, in case it needs
- // the attributes, we supply an identity "parse fn" so the worker will read PubsubMessage's
- // from Windmill and simply pass them around; and in case it doesn't need attributes,
- // we're already implicitly using a "Coder" that interprets the data as a PubsubMessage's
- // payload.
- if (overriddenTransform.getNeedsAttributes()) {
+ // the attributes or messageId, we supply an identity "parse fn" so the worker will
+ // read PubsubMessage's from Windmill and simply pass them around; and in case it
+ // doesn't need attributes, we're already implicitly using a "Coder" that interprets
+ // the data as a PubsubMessage's payload.
+ if (overriddenTransform.getNeedsAttributes() || overriddenTransform.getNeedsMessageId()) {
stepContext.addInput(
PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN,
byteArrayToJsonString(serializeToByteArray(new IdentityMessageFn())));