You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/03/23 19:40:42 UTC

[beam] branch master updated: [BEAM-9430] Fix coder sent to Dataflow service for non-portable pipelines due to WatermarkEstimators migration change

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6912011  [BEAM-9430] Fix coder sent to Dataflow service for non-portable pipelines due to WatermarkEstimators migration change
     new 1c35224  Merge pull request #11192 from lukecwik/splittabledofn
6912011 is described below

commit 6912011bd7b9745d5a3fd195165482bcc39ffa32
Author: Luke Cwik <lc...@google.com>
AuthorDate: Mon Mar 23 08:24:58 2020 -0700

    [BEAM-9430] Fix coder sent to Dataflow service for non-portable pipelines due to WatermarkEstimators migration change
---
 .../runners/dataflow/DataflowPipelineTranslator.java | 20 ++++++++++++++------
 .../dataflow/DataflowPipelineTranslatorTest.java     |  3 ++-
 2 files changed, 16 insertions(+), 7 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 5f7ec26..8be785e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -978,12 +978,16 @@ public class DataflowPipelineTranslator {
             if (context.isFnApi()) {
               DoFnSignature signature = DoFnSignatures.signatureForDoFn(transform.getFn());
               if (signature.processElement().isSplittable()) {
-                Coder<?> restrictionCoder =
-                    DoFnInvokers.invokerFor(transform.getFn())
-                        .invokeGetRestrictionCoder(
-                            context.getInput(transform).getPipeline().getCoderRegistry());
+                DoFnInvoker<?, ?> doFnInvoker = DoFnInvokers.invokerFor(transform.getFn());
+                Coder<?> restrictionAndWatermarkStateCoder =
+                    KvCoder.of(
+                        doFnInvoker.invokeGetRestrictionCoder(
+                            context.getInput(transform).getPipeline().getCoderRegistry()),
+                        doFnInvoker.invokeGetWatermarkEstimatorStateCoder(
+                            context.getInput(transform).getPipeline().getCoderRegistry()));
                 stepContext.addInput(
-                    PropertyNames.RESTRICTION_ENCODING, translateCoder(restrictionCoder, context));
+                    PropertyNames.RESTRICTION_ENCODING,
+                    translateCoder(restrictionAndWatermarkStateCoder, context));
               }
             }
           }
@@ -1190,7 +1194,11 @@ public class DataflowPipelineTranslator {
 
             stepContext.addInput(
                 PropertyNames.RESTRICTION_CODER,
-                translateCoder(transform.getRestrictionCoder(), context));
+                translateCoder(
+                    KvCoder.of(
+                        transform.getRestrictionCoder(),
+                        transform.getWatermarkEstimatorStateCoder()),
+                    context));
           }
         });
   }
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index d48f060..775c782 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -733,7 +733,8 @@ public class DataflowPipelineTranslatorTest implements Serializable {
                 Structs.getObject(
                     processKeyedStep.getProperties(), PropertyNames.RESTRICTION_CODER));
 
-    assertEquals(SerializableCoder.of(OffsetRange.class), restrictionCoder);
+    assertEquals(
+        KvCoder.of(SerializableCoder.of(OffsetRange.class), VoidCoder.of()), restrictionCoder);
   }
 
   /** Smoke test to fail fast if translation of a splittable ParDo in FnAPI. */