You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/16 08:13:00 UTC

[jira] [Commented] (FLINK-9374) Flink Kinesis Producer does not backpressure

    [ https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477014#comment-16477014 ] 

ASF GitHub Bot commented on FLINK-9374:
---------------------------------------

GitHub user fmthoma opened a pull request:

    https://github.com/apache/flink/pull/6021

    [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpressuring

    ## What is the purpose of the change
    
    The `FlinkKinesisProducer` just accepts records and forwards it to a `KinesisProducer` from the Amazon Kinesis Producer Library (KPL). The KPL internally holds an unbounded queue of records that have not yet been sent.
    
    Since Kinesis is rate-limited to 1MB per second per shard, this queue may grow indefinitely if Flink sends records faster than the KPL can forward them to Kinesis.
    
    One way to circumvent this problem is to set a record TTL, so that queued records are dropped after a certain amount of time, but this will lead to data loss under high loads.
    
    Currently the only time the queue is flushed is during checkpointing: `FlinkKinesisProducer` consumes records at arbitrary rate, either until a checkpoint is reached (and will wait until the queue is flushed), or until out-of-memory, whichever is reached first. (This gets worse due to the fact that the Java KPL is only a thin wrapper around a C++ process, so it is not even the Java process that runs out of memory, but the C++ process.)
    
    My proposed solution is to add a config option `queueLimit` to set a maximum number of records that may be waiting in the KPL queue. If this limit is reached, the `FlinkKinesisProducer` should trigger a `flush()` and wait (blocking) until the queue length is below the limit again. This automatically leads to backpressuring, since the `FlinkKinesisProducer` cannot accept records while waiting. For compatibility, `queueLimit` is set to `Integer.MAX_VALUE` by default, so the behavior is unchanged unless a client explicitly sets the value. Setting a »sane« default value is not possible unfortunately, since sensible values for the limit depend on the record size (the limit should be chosen so that about 10–100MB of records per shard are accumulated before flushing, otherwise the maximum Kinesis throughput may not be reached).
    
    ## Brief change log
    
    * Add a `queueLimit` setting to `FlinkKinesisProducer` to limit the number of in-flight records in the Kinesis Producer Library, and enable backpressuring if the limit is exceeded
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
    * Added unit test
    * Manually verified the change by running a job that produces to a 2-shard Kinesis stream. The input rate is limited by Kinesis (verified that the Kinesis stream is indeed at maximum capacity).
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes, but backwards compatible (option was added)
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): don't know
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: don't know
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? JavaDocs

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fmthoma/flink queueLimit

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6021.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6021
    
----
commit 9a2930cbbec4cd6979e6bfacb741da820cdbb284
Author: Franz Thoma <fr...@...>
Date:   2018-05-09T06:27:47Z

    [FLINK-9374] [kinesis] Add hardcoded queue size limit of 100000 records

commit e41037eb5e07efb73ded7f945111d0d5f6e9b18b
Author: Franz Thoma <fr...@...>
Date:   2018-05-09T06:56:53Z

    [FLINK-9374] [kinesis] Expose queueLimit option

commit 9222849869da0018718072c33b32d8d935f3dec4
Author: Franz Thoma <fr...@...>
Date:   2018-05-09T07:08:11Z

    [FLINK-9374] [kinesis] Refactor test: Mock implementation of flush() only flushes *some*, not *all* records

commit f062c5b9cd2e572da9fef0cdb5c8ea89af2a228c
Author: Franz Thoma <fr...@...>
Date:   2018-05-09T11:59:05Z

    [FLINK-9374] [kinesis] adapt tests

----


> Flink Kinesis Producer does not backpressure
> --------------------------------------------
>
>                 Key: FLINK-9374
>                 URL: https://issues.apache.org/jira/browse/FLINK-9374
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Franz Thoma
>            Priority: Critical
>         Attachments: after.png, before.png
>
>
> The {{FlinkKinesisProducer}} just accepts records and forwards it to a {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL internally holds an unbounded queue of records that have not yet been sent.
> Since Kinesis is rate-limited to 1MB per second per shard, this queue may grow indefinitely if Flink sends records faster than the KPL can forward them to Kinesis.
> One way to circumvent this problem is to set a record TTL, so that queued records are dropped after a certain amount of time, but this will lead to data loss under high loads.
> Currently the only time the queue is flushed is during checkpointing: {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a checkpoint is reached (and will wait until the queue is flushed), or until out-of-memory, whichever is reached first. (This gets worse due to the fact that the Java KPL is only a thin wrapper around a C++ process, so it is not even the Java process that runs out of memory, but the C++ process.) The implicit rate-limit due to checkpointing leads to a ragged throughput graph like this (the periods with zero throughput are the wait times before a checkpoint):
> !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput limited by checkpointing only
> My proposed solution is to add a config option {{queueLimit}} to set a maximum number of records that may be waiting in the KPL queue. If this limit is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and wait (blocking) until the queue length is below the limit again. This automatically leads to backpressuring, since the {{FlinkKinesisProducer}} cannot accept records while waiting. For compatibility, {{queueLimit}} is set to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a client explicitly sets the value. Setting a »sane« default value is not possible unfortunately, since sensible values for the limit depend on the record size (the limit should be chosen so that about 10–100MB of records per shard are accumulated before flushing, otherwise the maximum Kinesis throughput may not be reached).
> !after.png! Throughput with a queue limit of 100000 records (the spikes are checkpoints, where the queue is still flushed completely)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)