You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/03/15 18:33:00 UTC

[jira] [Updated] (BEAM-11755) Cross-language consistency (RequiresStableInputs) is quietly broken (at least on portable flink runner)

     [ https://issues.apache.org/jira/browse/BEAM-11755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kenneth Knowles updated BEAM-11755:
-----------------------------------
    Priority: P1  (was: P0)

> Cross-language consistency (RequiresStableInputs) is quietly broken (at least on portable flink runner)
> -------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-11755
>                 URL: https://issues.apache.org/jira/browse/BEAM-11755
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-kafka, runner-flink, sdk-java-harness, sdk-py-core
>            Reporter: Marek Pikulski
>            Priority: P1
>
> Since the Python SDK does not seem to provide anything similar to [https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/transforms/DoFn.RequiresStableInput.html,] I wrote a small cross-language transform in Java, to be called from a Python SDK pipeline executed using the Flink runner. The expectation was that it would perform the necessary buffering to correctly implement exactly-once semantics in my use case.
> However, this did not result in the creation of any Flink checkpoints. The reason seems to be that the code in [https://github.com/apache/beam/blob/73731ec4f3f2d185e89aa3e378d321c2154ecf53/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L492] is never executed, because the UDF is called using the FnApiDoFnRunner instead.
> This behavior appears particularly problematic, because the RequiresStableInputs annotation is *silently* ignored, so users might falsely believe that they get exactly-once semantics (EOS), whereas they only get some kind of "at-least-once if the upstream pipeline happens to be deterministic" (which is not the case in general).
> Thus, if a user where to use, e.g., the Kafka EOS sink ([https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-]) which relies on the RequiresStableInputs mechanism, in their cross-language Java UDF, that might not provide correct (i.e., potentially not even at-least-once) output in general if the upstream pipeline is not deterministic and needs to be replayed from a checkpoint.
> I feel this issue should be prioritized, because it essentially makes it impossible to achieve source-to-sink exactly once guarantees when using Beam on Flink with the Python SDK.
> From a user perspective, either (or a combination) of the following would resolve the issue:
>  * Implement something like RequiresStableInputs for the Python SDK's DoFn (and ensure that using RequiresStableInputs in a Java-based DoFn results in an error if the latter is called from a pipeline defined with the Python SDK).
>  * Extend the FnApiDoFnRunner to provide stable inputs to DoFn which require it.
> Unfortunately, I do not feel familiar enough with the code base to address the issue myself — at least not without further guidance, so any feedback is welcome.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)