You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "psolomin (via GitHub)" <gi...@apache.org> on 2023/03/25 01:15:44 UTC

[GitHub] [beam] psolomin opened a new issue, #25975: [Bug]: Reducing parallelism in FlinkRunner leads to a data loss

psolomin opened a new issue, #25975:
URL: https://github.com/apache/beam/issues/25975

   ### What happened?
   
   Beam 2.46.0
   Java 11
   
   According to the [doc](https://beam.apache.org/documentation/runners/flink/#pipeline-options-for-the-flink-runner) FlinkRunner supports options `parallelism` and `maxParallelism`. When `parallelism` is reduced between restarts with a savepoint, some bundles are never dispatched.
   
   How to reproduce:
   1. Start a Beam-Flink streaming job with `parallelism=3`, and `maxParallelism` not set
   2. Stop the job with a savepoint
   3. Re-start the job from the savepoint
   
   If the job is then re-started again from that savepoint with original `parallelism=3`, the bundles are dispatched and pass through.
   
   Setting `maxParallelism=3` and keeping it across app restarts does not change this behaviour.
   
   This is a simplified job code (AWS Kinesis source, 3 shards):
   
   ```
   PCollection<KinesisRecord> windowedRecords = p.apply("Source", reader)
                   .apply("Fixed windows", Window.<KinesisRecord>into(FixedWindows.of(Duration.standardSeconds(60))))
                   ...
                   .apply(
                           "Sink to S3",
                           FileIO.<GenericRecord>write()
                                   .via(ParquetIO.sink(ConsumedEvent.SCHEMA$)
                                           .withCompressionCodec(CompressionCodecName.SNAPPY))
                                   .to(opts.getSinkLocation()))
   ```
   
   Toy project: https://github.com/psolomin/beam-playground/tree/parallelism-issue/kinesis-io-with-enhanced-fan-out#vanilla-flink
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [X] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Reducing parallelism in FlinkRunner leads to a data loss [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1776656656

   I finally had more time to look into the code. KinesisIO looks good in terms of default watermarking. What is probably causing the issues is [this line][1] in your test application. Processing time watermark will move watermark significantly on restore. It is actually strange (and possibly result of some other coincidence) that you don't see data loss on *every restart* of a Pipeline. I think the issue will disappear if you use the default watermarking policy and set allowed lateness as suggests [this comment][2]
   
    [1]: https://github.com/psolomin/beam-playground/blob/e0f1834d6ee76a796d6d21836e1d04140e6d9ca2/kinesis-io-with-enhanced-fan-out/src/main/java/com/psolomin/consumer/KinesisToFilePipeline.java#L52
    [2]: https://github.com/apache/beam/blob/bf5ded44e6ee7e9e752f44379df43a3aa453fc7e/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java#L127


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: KinesisIO processing-time watermarking can cause data loss [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1801643738

   It is related, but not 100% implemented under #28763 
   We need first add ability to specify the timestamp extraction function to watermark policy (the question is than if it should be called watermark policy, or change the name to timestamp policy, similar to KafkaIO) and then use this to set processing time timestamp to records when using processing time policy.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Reducing parallelism in FlinkRunner leads to a data loss [beam]

Posted by "psolomin (via GitHub)" <gi...@apache.org>.
psolomin commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1763515682

   Hi @je-ik 
   
   >  Does that mean you compare the outputs of the rescaled job with the provided inputs
   
   Yes, I publish messages with IDs to Kinesis, then consume them, save in files and check those files with other tools (like Spark). Is there a better approach to check data integrity when we run parallel consumer instances?
   
   > does this still happen on 2.50.0?
   
   I ran a couple of tests with 2.50.0 too:
   
   1. Set N of Kinesis stream shards to 1, left pipeline parallelism = 3 and restarted from a savepoint with parallelism = 2 - this did not gave data loss.
   
   2. Set N of Kinesis stream shards to 3, left pipeline parallelism = 3 and restarted from a savepoint with parallelism = 2 - this gave data loss. After another restart with parallelism = 3 all data came through.
   
   3. Ran same test in Flink with `KafkaIO` and **was not able to reproduce** the issue with a similar setup of 3 partitions and initial parallelism = 3
   
   I will assign the issue to myself and will run more tests and try to narrow down the issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Reducing parallelism in FlinkRunner leads to a data loss [beam]

Posted by "psolomin (via GitHub)" <gi...@apache.org>.
psolomin commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1763516499

   .take-issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Reducing parallelism in FlinkRunner leads to a data loss [beam]

Posted by "psolomin (via GitHub)" <gi...@apache.org>.
psolomin commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1774933051

   @je-ik 
   
   Thanks for the tip. I've enabled the logs you mentioned, and I've seen logs like this after reducing parallelism 3 -> 2:
   
   > LateDataFilter: Dropping element at 2023-10-23T09:47:44.305Z for key:key: 0 shard: 0; window:[2023-10-23T09:47:00.000Z..2023-10-23T09:48:00.000Z) since too far behind inputWatermark:2023-10-23T09:50:03.009Z; outputWatermark:2023-10-23T09:50:03.009Z
   
   The other logs tell
   
   > WindowTracing - describePane: ON_TIME
   
   and they all are `ON_TIME` which contradicts `Dropping element ...` logs.
   
   >  It might be the case that watermarks are not reconstructed correctly in this case in KinesisIO.
   
   Is it possible this issue is related to https://github.com/apache/beam/issues/28760?
   
   I also tried to assign timestamps to records:
   
   ```
   public class AssignTimestampsDoFn extends DoFn<KinesisRecord, KinesisRecord> {
       @ProcessElement
       public void processElement(@Element KinesisRecord input, OutputReceiver<KinesisRecord> out) {
           out.outputWithTimestamp(input, Instant.now());
       }
   
       @Override
       public Duration getAllowedTimestampSkew() {
           return Duration.standardSeconds(5);
       }
   }
   
   ```
   
   ```
   PCollection<KinesisRecord> windowedRecords = p.apply("Source", reader)
                   .apply("Assign ts", ParDo.of(new AssignTimestampsDoFn()))
                   .apply("Fixed windows", Window.<KinesisRecord>into(FixedWindows.of(Duration.standardSeconds(60))));
   
   ```
   
   After this, reducing parallelism was not causing data loss anymore.
   
   What might be the next steps, in your opinion? Moving forward with fixing https://github.com/apache/beam/issues/28760 in the way @sjmittal proposed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Reducing parallelism in FlinkRunner leads to a data loss [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1764335135

   Sorry, I forgot one "detail". The `Read` transform is by default expanded to SDF as well, so in order to make `KinesisIO` and `KafkaIO` use the same code path, it is necessary to pass the `--experiments=use_deprecated_read` to the `KinesisIO` pipeline as well.
   Does the flag change the behavior?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Reducing parallelism in FlinkRunner leads to a data loss [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1794859833

   @psolomin The mailing list thread is not conclusive, but generally I think we can try to unify the behavior of KafkaIO and KinesisIO, that would be option 2). Would you like to work on tjhe change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on issue #25975: [Bug]: Reducing parallelism in FlinkRunner leads to a data loss

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1510913893

   Increasing this to P1 as there's a risk of data loss.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Reducing parallelism in FlinkRunner leads to a data loss [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1763861262

   The biggest difference between `KinesisIO` and `KafkaIO` seems that (by default) `KinesisIO` uses `Read` transform, while `KafkaIO` uses SDF. Can you try setting `--experiments=use_deprecated_read` for `KafkaIO`? That should change the DAG and it should use `Read` as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Reducing parallelism in FlinkRunner leads to a data loss [beam]

Posted by "psolomin (via GitHub)" <gi...@apache.org>.
psolomin commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1764127126

   > Can you try setting --experiments=use_deprecated_read for KafkaIO?
   
   Tried. And it still worked without issues - reducing Flink parallelism from 3 to 2, with 3 partitions in the source topic, did not cause records loss. This is how I run it:
   
   ```
   docker exec -u flink -it kafka-consumer-flink-jm-1 flink run \
   	-s file:///mnt/savepoints/savepoint-< savepoint id > \
   	--class com.kfk.consumer.FlinkRunnerMain --detached \
   	/mnt/artifacts/example-com.kfk.consumer.FlinkRunnerMain-bundled-0.1-SNAPSHOT.jar \
   	--bootstrapServers=kafka:9092 --inputTopics=raw,raw2 --outputDir=/mnt/output \
   	--autoWatermarkInterval=10000 \
   	--externalizedCheckpointsEnabled=true \
   	--checkpointingMode=EXACTLY_ONCE \
   	--numConcurrentCheckpoints=1 \
   	--checkpointTimeoutMillis=500000 \
   	--checkpointingInterval=60000 \
   	--minPauseBetweenCheckpoints=5000 \
   	--stateBackend=rocksdb \
   	--stateBackendStoragePath=file:///tmp/flink-state \
   	--parallelism=2 \
           --experiments=use_deprecated_read
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Reducing parallelism in FlinkRunner leads to a data loss [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1754927422

   @psolomin thanks for reporting this! Can you please clarify what "bundles are not dispatched" means? Does that mean you compare the outputs of the rescaled job with the provided inputs and those do not match or is it something else? Did you try newer versions of Beam, does this still happen on 2.50.0?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: KinesisIO processing-time watermarking can cause data loss [beam]

Posted by "psolomin (via GitHub)" <gi...@apache.org>.
psolomin commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1801962016

   Ok, I will check deeper. Thanks for clarifying.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Reducing parallelism in FlinkRunner leads to a data loss [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1764561560

   > Interestingly, Flink UI reports the same Records sent from this stage when I do both parallelism = 2 and = 3
   
   This might be a hint. On restore, watermarks are recomputed from scratch. It might be the case that watermarks are not reconstructed correctly in this case in `KinesisIO`.
   
   Can you either try if you see metric `droppedDueToLateness` or enable debug to see [this log][1]?
   
    [1]: https://github.com/apache/beam/blob/6b32a3fb7d97ec932bde8c0d8c8b812ed2e940da/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Reducing parallelism in FlinkRunner leads to a data loss [beam]

Posted by "psolomin (via GitHub)" <gi...@apache.org>.
psolomin commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1780742092

   @je-ik 
   
   > Processing time watermark will move watermark significantly on restore.
   
   I tried to drop it, but left the window the simple one:
   
   >                 .apply("Fixed windows", Window.<KinesisRecord>into(FixedWindows.of(Duration.standardSeconds(60))));
   
   This actually made all records coming through, even when I reduced parallelism.
   
   > if you use the default watermarking policy and set allowed lateness as suggests [this comment]
   
   This comment is hard to find - it's not in the `KinesisIO` javadoc.
   
   Overall, I am feeling very confused cause it took me multiple trials to understand what combos of watermark policy and windowing are no-dataloss ones. When developing a pipeline, I do `.withProcessingTimeWatermarkPolicy()` to express "I do not care when records were produced, I just want to pull them and put into windows upon their time of arrival into my app". I think there's a need to do certain IO changes, I see these options:
   
   1. Add a warning to `withProcessingTimeWatermarkPolicy` javadoc saying that it causes data loss unless one adds custom processing time timestamps to the records
   2. Make the IO sending records with `Instance.now()` timestamps if `withProcessingTimeWatermarkPolicy` is set (let's call it KafkaIO-way)
   3. Deprecate `withProcessingTimeWatermarkPolicy` due to potential data losses
   
   What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: KinesisIO processing-time watermarking can cause data loss [beam]

Posted by "shunping (via GitHub)" <gi...@apache.org>.
shunping commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1989339456

   Hi @psolomin, any update on this issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Reducing parallelism in FlinkRunner leads to a data loss [beam]

Posted by "psolomin (via GitHub)" <gi...@apache.org>.
psolomin commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1764531872

   @je-ik tried that flag with `KinesisIO` too. Still observe the same pattern:
   
   - start from a savepoint with reduced parallelism -> data loss
   - start again from that savepoint with previous parallelism -> no data loss
   
   Interestingly, Flink UI reports the same `Records sent` from this stage when I do both parallelism = 2 and = 3:
   
   ```
    Source: Source/Read(KinesisSource)
   +- Flat Map
      +- Fixed windows/Window.Assign.out
         +- Process payloads/ParMultiDo(SlowProcessor)
            +- Maybe fail/ParMultiDo(FailingProcessor)
               +- Parse payloads/ParMultiDo(ConsumedEventDeserializer)
                  +- Sink to S3/WriteFiles/WriteShardedBundlesToTempFiles/ApplyShardingKey/ParMultiDo(ApplyShardingFunction)
                     +- ToBinaryKeyedWorkItem
   ```
   
   My Kinesis and Kafka pipeline actually do a bit different things, so, I will try converge them as much as possible before concluding anything futher.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: KinesisIO processing-time watermarking can cause data loss [beam]

Posted by "psolomin (via GitHub)" <gi...@apache.org>.
psolomin commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1801565231

   @je-ik 
   
   It seems option 2 is being implemented under https://github.com/apache/beam/pull/28763
   I will look into that change within next 2 days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Reducing parallelism in FlinkRunner leads to a data loss [beam]

Posted by "je-ik (via GitHub)" <gi...@apache.org>.
je-ik commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1782510940

   I'd go with 2) or 3), but I don't have strong preference between these two. I created [mailing list thread][1] for that.
   
   [1]: https://lists.apache.org/thread/80m2j3q98f70f9kfhoon4f87sqxc9p0k


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: KinesisIO processing-time watermarking can cause data loss [beam]

Posted by "psolomin (via GitHub)" <gi...@apache.org>.
psolomin commented on issue #25975:
URL: https://github.com/apache/beam/issues/25975#issuecomment-1998616364

   Hi @shunping no updates yet, I did not have sparse time. I plan to have some time in June.
   
   The guidances in the previous comments can be enough for someone else's PR. In that case I can re-assign the issue and will help reviewing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org