You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/03/23 19:40:42 UTC
[beam] branch master updated: [BEAM-9430] Fix coder sent to
Dataflow service for non-portable pipelines due to WatermarkEstimators
migration change
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6912011 [BEAM-9430] Fix coder sent to Dataflow service for non-portable pipelines due to WatermarkEstimators migration change
new 1c35224 Merge pull request #11192 from lukecwik/splittabledofn
6912011 is described below
commit 6912011bd7b9745d5a3fd195165482bcc39ffa32
Author: Luke Cwik <lc...@google.com>
AuthorDate: Mon Mar 23 08:24:58 2020 -0700
[BEAM-9430] Fix coder sent to Dataflow service for non-portable pipelines due to WatermarkEstimators migration change
---
.../runners/dataflow/DataflowPipelineTranslator.java | 20 ++++++++++++++------
.../dataflow/DataflowPipelineTranslatorTest.java | 3 ++-
2 files changed, 16 insertions(+), 7 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 5f7ec26..8be785e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -978,12 +978,16 @@ public class DataflowPipelineTranslator {
if (context.isFnApi()) {
DoFnSignature signature = DoFnSignatures.signatureForDoFn(transform.getFn());
if (signature.processElement().isSplittable()) {
- Coder<?> restrictionCoder =
- DoFnInvokers.invokerFor(transform.getFn())
- .invokeGetRestrictionCoder(
- context.getInput(transform).getPipeline().getCoderRegistry());
+ DoFnInvoker<?, ?> doFnInvoker = DoFnInvokers.invokerFor(transform.getFn());
+ Coder<?> restrictionAndWatermarkStateCoder =
+ KvCoder.of(
+ doFnInvoker.invokeGetRestrictionCoder(
+ context.getInput(transform).getPipeline().getCoderRegistry()),
+ doFnInvoker.invokeGetWatermarkEstimatorStateCoder(
+ context.getInput(transform).getPipeline().getCoderRegistry()));
stepContext.addInput(
- PropertyNames.RESTRICTION_ENCODING, translateCoder(restrictionCoder, context));
+ PropertyNames.RESTRICTION_ENCODING,
+ translateCoder(restrictionAndWatermarkStateCoder, context));
}
}
}
@@ -1190,7 +1194,11 @@ public class DataflowPipelineTranslator {
stepContext.addInput(
PropertyNames.RESTRICTION_CODER,
- translateCoder(transform.getRestrictionCoder(), context));
+ translateCoder(
+ KvCoder.of(
+ transform.getRestrictionCoder(),
+ transform.getWatermarkEstimatorStateCoder()),
+ context));
}
});
}
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index d48f060..775c782 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -733,7 +733,8 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Structs.getObject(
processKeyedStep.getProperties(), PropertyNames.RESTRICTION_CODER));
- assertEquals(SerializableCoder.of(OffsetRange.class), restrictionCoder);
+ assertEquals(
+ KvCoder.of(SerializableCoder.of(OffsetRange.class), VoidCoder.of()), restrictionCoder);
}
/** Smoke test to fail fast if translation of a splittable ParDo in FnAPI. */