You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "kkdoon (via GitHub)" <gi...@apache.org> on 2023/09/20 06:26:10 UTC

[GitHub] [beam] kkdoon opened a new issue, #28554: [Bug]: Unable to drain Flink job when RequiresStableInput is used

kkdoon opened a new issue, #28554:
URL: https://github.com/apache/beam/issues/28554

   ### What happened?
   
   **Issue:**
   Flink pipeline does not get drained when RequiresStableInput annotation is used. Stack-trace when drain is triggered:
   `Caused by: java.lang.RuntimeException: There are still watermark holds left when terminating operator KVStoreTransform/GroupIntoBatches/ParDo(GroupIntoBatches)/ParMultiDo(GroupIntoBatches) -> PersistenceTransform/WriteSpendStateToDb/ParMultiDo(WriteSpendStateDbStreaming) (1/1)#0 Watermark held 1695147850921
       at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.flushData(DoFnOperator.java:631)
       at org.apache.beam.runners.flink.translation.wrappers.streaming.AbstractStreamOperatorCompat.finish(AbstractStreamOperatorCompat.java:67)
       at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
       at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:239)
       at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:213)
       at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
       at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
       at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97)
       at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:186)
       ... 13 more
   `
   
   **Cause:**
   When drain is triggered, MAX_WATERMARK gets emitted before the last checkpoint barrier. This helps in triggering all the registered event-time timers and flushing out any state. Therefore, the expectation is that when [flushData](https://github.com/apache/beam/blob/79d0a8d11095522a036a6b3007f5fed4f6f46b3b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L608) is finally invoked in DoFnOperator, all the event timers should be fired and the watermark should proceed.
   
   However, when RequiresStableInput annotation is used, the behavior is to process the DoFn **after** the checkpoint operation is complete. Since, flush is invoked when the final checkpoint/savepoint operation is in progress, the watermark is held by the DoFn with the RequiresStableInput annotation, sine data is unprocessed and is waiting for the checkpoint to complete.
   
   **Potential Solutions:**
   1. Skip [this](https://github.com/apache/beam/blob/79d0a8d11095522a036a6b3007f5fed4f6f46b3b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L629) check in case the DoFn has RequiresStableInput set, since we know that all the final pending data will be processed after the final savepoint operation completes.
   2. Trigger `bufferingDoFnRunner.checkpointCompleted` within `flushData` to ensure that checkpointing is completed and all pending processing of buffered data is done within flushData itself.
   
   
   
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [X] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kkdoon commented on issue #28554: [Bug]: Unable to drain Flink job when RequiresStableInput is used

Posted by "kkdoon (via GitHub)" <gi...@apache.org>.
kkdoon commented on issue #28554:
URL: https://github.com/apache/beam/issues/28554#issuecomment-1731750827

   My job only has 1 DoFn using the `@RequiresStableInput` annotation (at the terminal node). Job graph looks as follows:
   
   `KafkaIO -> DoFn -> DoFn with RequiresStableInput`
   
   The job always fails during the final savepoint operation with the watermark hold error and not after the savepoint is taken (fails between step b and c).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Unable to drain Flink job when RequiresStableInput is used [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on issue #28554:
URL: https://github.com/apache/beam/issues/28554#issuecomment-1776607125

   Closed via #29102


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] je-ik commented on issue #28554: [Bug]: Unable to drain Flink job when RequiresStableInput is used

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on issue #28554:
URL: https://github.com/apache/beam/issues/28554#issuecomment-1731841926

   This feels strange. Can you try to log the elements arriving at the stable DoFn after the savepoint to see where are they coming from? I would be surprised if this would be coming from the KafkaIO (provided it stops sending data before triggering the checkpoint).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Unable to drain Flink job when RequiresStableInput is used [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik closed issue #28554: [Bug]: Unable to drain Flink job when RequiresStableInput is used
URL: https://github.com/apache/beam/issues/28554


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Unable to drain Flink job when RequiresStableInput is used [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on issue #28554:
URL: https://github.com/apache/beam/issues/28554#issuecomment-1746498546

   Yes, if the final checkpoint fails (for whatever reason), then the retry can yield different result than the first run. I think we have only two options:
    a) best-effort, i.e. flushing our buffer in `flushData`, which is what it was intended for, or
    b) fail the pipeline, as draining is incompatible with stable DoFns.
    
   Currently, we do b), which is actually semantically correct. We can change the exception to be more explanatory or even better throw exception unconditionally, if DoFn has stable input (currently it might fail non-deterministically).
   
   Because switching between a) and b) might be subject to user decision, we might introduce a flag to `FlinkPipelineOptions` that will opt-in to from b) to a).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Unable to drain Flink job when RequiresStableInput is used [beam]

Posted by "kkdoon (via GitHub)" <gi...@apache.org>.
kkdoon commented on issue #28554:
URL: https://github.com/apache/beam/issues/28554#issuecomment-1747311798

   ok, adding a flag for drain behavior sounds reasonable. 
   
   Lastly, I just wanted to double check with what issues do you see if we instead skip the watermark hold check inside `flushData` and do it inside `notifyCheckpointComplete` (only when drain is initiated) ? That way we still maintain the contract of flushing after checkpoint is completed. And in this case if final checkpoint fails, its still ok since drain will fail and pipeline will continue running (and user can retry again later). And if checkpoint is successful but stable DoFn fails, then drain will finish successfully with a warning message (to restore last savepoint and re-run the pipeline since some elements were not processed).
   
   If watermark skip approach doesn't seem correct, then i will update my [PR](https://github.com/apache/beam/pull/28567) to implement approach A.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] je-ik commented on issue #28554: [Bug]: Unable to drain Flink job when RequiresStableInput is used

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on issue #28554:
URL: https://github.com/apache/beam/issues/28554#issuecomment-1742501922

   Sorry for late reaction, I was OOO.
   
   I once again walked though the code and it seems to make sense now. Here is the problem:
    a) `flushData` method is wired up to https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/StreamOperator.html#finish-- (https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html#close-- to Flink 1.12)
    b) this method is apparently called prior to call to `notifyCheckpointComplete`, which means we still have buffered data
   
   The only solution then seems to make sure that in call to `flushData` je clean the buffer (process all buffered data and emit outputs, clearing the watermark hold). All this should happen immediately *before* the last checkpoint is completed and thus it should not break the stable input DoFn's contract.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kkdoon commented on issue #28554: [Bug]: Unable to drain Flink job when RequiresStableInput is used

Posted by "kkdoon (via GitHub)" <gi...@apache.org>.
kkdoon commented on issue #28554:
URL: https://github.com/apache/beam/issues/28554#issuecomment-1732422840

   These events are the buffered events that are emitted from DoFn1. I tried the following steps:
   1. Publish 10 elements to Kafka topic and stop the producer job.
   2. Verified that the 10 elements were processed by DoFn1 and Stable DoFn did not receive them.
   3. Drained the job and it failed to execute the savepoint (and did not emit any of the buffered 10 elements).
   
   From my experience, the only instance drain operation works is if the buffered elements are fully processed (via checkpoint trigger) and buffer is empty at time of drain execution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Unable to drain Flink job when RequiresStableInput is used [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on issue #28554:
URL: https://github.com/apache/beam/issues/28554#issuecomment-1747399873

   There are two problems:
    a) there can be other reasons why there are watermark holds besides stable input bufferring, so skipping the checks would hide those as well
    b) draining pipeline first updates watermark to infinity, which (as documented) would result in incorrect output if the pipeline is later resumed from the final savepoint (it would move watermark bafk in time, which is correctness bug). Draining is meant only for cases when the pipeline will not be resumed later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] je-ik commented on issue #28554: [Bug]: Unable to drain Flink job when RequiresStableInput is used

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on issue #28554:
URL: https://github.com/apache/beam/issues/28554#issuecomment-1730987899

   From my understanding, using `stop` with `--savepoint` and `--drain`, then this should result in the following sequence:
    a) sources stop emitting data
    b) MAX_WATERMARK is emitted
    c) savepoint is taken
    d) job is terminated
    
   This would IMO result in correct job termination, what seems questionable is this sentence from the docs:
   ```
   The job will keep running until all sources properly shut down. This allows the job to finish processing all in-flight data, which can produce some records to process after the savepoint taken while stopping.
   ```
   If there is some possibility that some data is emitted *after* the savepoint, then this violates the requirements we have for @RequiresStableInput. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] je-ik commented on issue #28554: [Bug]: Unable to drain Flink job when RequiresStableInput is used

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on issue #28554:
URL: https://github.com/apache/beam/issues/28554#issuecomment-1729080485

   @dmvk Is it possible that this is bug in Flink? I think there should be checkpoint barrier strictly before emitting MAX_WATERMARK, does it make sense?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Unable to drain Flink job when RequiresStableInput is used [beam]

Posted by "kkdoon (via GitHub)" <gi...@apache.org>.
kkdoon commented on issue #28554:
URL: https://github.com/apache/beam/issues/28554#issuecomment-1747415437

   Thanks for the clarification! Makes sense to make this operation best-effort in that case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Unable to drain Flink job when RequiresStableInput is used [beam]

Posted by "kkdoon (via GitHub)" <gi...@apache.org>.
kkdoon commented on issue #28554:
URL: https://github.com/apache/beam/issues/28554#issuecomment-1745550628

   No worries and thanks for your response.
   
   I wanted to double check that if we flush the buffer within `flushData` AND before final checkpoint operation is complete, won't that violate stable input DoFn contract (where stable DoFn should only be invoked once state is check-pointed)?  Or is it ok in case of drain operation? I might be misunderstanding the sequence of operations here.
   
   Issue i see is that once we flush the buffer, it would trigger the downstream DoFn processing. And in the scenario where the downstream stable DoFn operation fails (say its writing to a sink and the sink is unavailable and/or checkpoint itself has timeout), our final checkpoint and drain operation will fail, which could lead to inconsistent state.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] je-ik commented on issue #28554: [Bug]: Unable to drain Flink job when RequiresStableInput is used

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on issue #28554:
URL: https://github.com/apache/beam/issues/28554#issuecomment-1729217492

   @kkdoon can you please clarify how you do the drain? According to the [docs][1], using stop and drain with savepoint should do all the necessary steps for the pipeline to terminate. Is this not working in your case?
   
   [1]: https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#terminating-a-job


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kkdoon commented on issue #28554: [Bug]: Unable to drain Flink job when RequiresStableInput is used

Posted by "kkdoon (via GitHub)" <gi...@apache.org>.
kkdoon commented on issue #28554:
URL: https://github.com/apache/beam/issues/28554#issuecomment-1730374841

   I have tried stopping the job using both the [CLI](https://github.com/apache/flink/blob/5682472e4029c25d4a2651a609c999029fa3281b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L541) (with --drain) as well as via flink [RestClusterClient library ](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/client/program/rest/RestClusterClient.html#stopWithSavepoint-org.apache.flink.api.common.JobID-boolean-java.lang.String-org.apache.flink.core.execution.SavepointFormatType-) where `advanceToEndOfTime` is set to True to drain the job.
   
   Note that when drain is set to false, pipeline is able to successfully stop with savepoint and DoFn with `RequiresStableInput` annotation does emit the buffered results. There's no error since [flushData](https://github.com/apache/beam/blob/79d0a8d11095522a036a6b3007f5fed4f6f46b3b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L608) is not invoked & watermark does not proceed to MAX_WATERMARK.
   
   So this issue is only for the specific scenario when drain flag is used to advance the watermark and permanently stop the pipeline.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org