You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fmthoma <gi...@git.apache.org> on 2018/05/16 08:12:18 UTC

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

GitHub user fmthoma opened a pull request:

    https://github.com/apache/flink/pull/6021

    [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpressuring

    ## What is the purpose of the change
    
    The `FlinkKinesisProducer` just accepts records and forwards it to a `KinesisProducer` from the Amazon Kinesis Producer Library (KPL). The KPL internally holds an unbounded queue of records that have not yet been sent.
    
    Since Kinesis is rate-limited to 1MB per second per shard, this queue may grow indefinitely if Flink sends records faster than the KPL can forward them to Kinesis.
    
    One way to circumvent this problem is to set a record TTL, so that queued records are dropped after a certain amount of time, but this will lead to data loss under high loads.
    
    Currently the only time the queue is flushed is during checkpointing: `FlinkKinesisProducer` consumes records at arbitrary rate, either until a checkpoint is reached (and will wait until the queue is flushed), or until out-of-memory, whichever is reached first. (This gets worse due to the fact that the Java KPL is only a thin wrapper around a C++ process, so it is not even the Java process that runs out of memory, but the C++ process.)
    
    My proposed solution is to add a config option `queueLimit` to set a maximum number of records that may be waiting in the KPL queue. If this limit is reached, the `FlinkKinesisProducer` should trigger a `flush()` and wait (blocking) until the queue length is below the limit again. This automatically leads to backpressuring, since the `FlinkKinesisProducer` cannot accept records while waiting. For compatibility, `queueLimit` is set to `Integer.MAX_VALUE` by default, so the behavior is unchanged unless a client explicitly sets the value. Setting a »sane« default value is not possible unfortunately, since sensible values for the limit depend on the record size (the limit should be chosen so that about 10–100MB of records per shard are accumulated before flushing, otherwise the maximum Kinesis throughput may not be reached).
    
    ## Brief change log
    
    * Add a `queueLimit` setting to `FlinkKinesisProducer` to limit the number of in-flight records in the Kinesis Producer Library, and enable backpressuring if the limit is exceeded
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
    * Added unit test
    * Manually verified the change by running a job that produces to a 2-shard Kinesis stream. The input rate is limited by Kinesis (verified that the Kinesis stream is indeed at maximum capacity).
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes, but backwards compatible (option was added)
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): don't know
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: don't know
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? JavaDocs

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fmthoma/flink queueLimit

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6021.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6021
    
----
commit 9a2930cbbec4cd6979e6bfacb741da820cdbb284
Author: Franz Thoma <fr...@...>
Date:   2018-05-09T06:27:47Z

    [FLINK-9374] [kinesis] Add hardcoded queue size limit of 100000 records

commit e41037eb5e07efb73ded7f945111d0d5f6e9b18b
Author: Franz Thoma <fr...@...>
Date:   2018-05-09T06:56:53Z

    [FLINK-9374] [kinesis] Expose queueLimit option

commit 9222849869da0018718072c33b32d8d935f3dec4
Author: Franz Thoma <fr...@...>
Date:   2018-05-09T07:08:11Z

    [FLINK-9374] [kinesis] Refactor test: Mock implementation of flush() only flushes *some*, not *all* records

commit f062c5b9cd2e572da9fef0cdb5c8ea89af2a228c
Author: Franz Thoma <fr...@...>
Date:   2018-05-09T11:59:05Z

    [FLINK-9374] [kinesis] adapt tests

----


---

[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on the issue:

    https://github.com/apache/flink/pull/6021
  
    @tzulitai I added some docs.
    
    As for the `flush()` vs. just waiting: As I see it, the [`RecordMaxBufferedTime`](https://github.com/awslabs/amazon-kinesis-producer/blob/ce77505306c104a6016b0c081df4715d05ac9201/java/amazon-kinesis-producer-sample/default_config.properties#L239) option (default: 100 milliseconds) limits the time a record should be kept in the queue in the absence of pressure. Hence I think that the `flush()` is indeed not necessary, unless a user purposefully sets the `queueLimit` too low *and* the `RecordMaxBufferedTime` too high.
    
    Also, I added [another comment](https://github.com/apache/flink/pull/6021#discussion_r190154347) concerning the `sleep` vs `wait`, that github unfortunately displays as »outdated«.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197071117
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void enforceQueueLimit() {
    --- End diff --
    
    Would like to request one more slight change here:
    Let this method return a boolean that indicates whether or not flushing occurred.
    
    The caller of this method can then use the flag to decide whether or not `checkAndPropagateAsyncError` is required.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r189432840
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -218,6 +232,8 @@ public void invoke(OUT value, Context context) throws Exception {
     			throw new RuntimeException("Kinesis producer has been closed");
     		}
     
    +		checkAndPropagateAsyncError();
    +		checkQueueLimit();
     		checkAndPropagateAsyncError();
    --- End diff --
    
    `snapshotState()` also checks twice explicitly, and I think it makes sense to have the two checks on the same level. But I won't insist on that, if you prefer having it more implicitly.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197065346
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -55,6 +58,13 @@
     @PublicEvolving
     public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
     
    +	public static final String KINESIS_PRODUCER_METRIC_GROUP = "kinesisProducer";
    +
    +	public static final String METRIC_BACKPRESSURE_CYCLES = "backpressureCycles";
    +
    +	public static final String METRIC_OUTSTANDING_RECORDS_COUNT = "outstandingRecordsCount";
    +
    +
    --- End diff --
    
    nit: unnecessary line


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r189174902
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void checkQueueLimit() {
    --- End diff --
    
    probably rename it to something different, e.g  `enforceQueueLimit()`? because it clearly does things more than just 'check'


---

[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/6021
  
    @fmthoma I've merged this manually. Thanks for the contribution.
    Could you close this PR?


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197068370
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kinesis.util;
    +
    +public class TimeoutLatch {
    --- End diff --
    
    This needs to be annotated as `@Internal`


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r189432802
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void checkQueueLimit() {
    --- End diff --
    
    I agree.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r189177308
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void checkQueueLimit() {
    +		while (producer.getOutstandingRecordsCount() >= queueLimit) {
    +			producer.flush();
    --- End diff --
    
    When we add a record to the producer queue via `producer.addUserRecord(...)`, we get a callback. We can use that callback to notify the blocking operation in `checkQueueLimit`.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r189433306
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void checkQueueLimit() {
    +		while (producer.getOutstandingRecordsCount() >= queueLimit) {
    +			producer.flush();
    --- End diff --
    
    So I'd suggest to add `producer.notifyAll()` to both `onSuccess()` and `onFailure()` in the callback, and replace the `Thread.sleep(500)` by `producer.wait(500)`. This way we re-check with every record sent out, or at most after 0.5 seconds.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r189176708
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void checkQueueLimit() {
    +		while (producer.getOutstandingRecordsCount() >= queueLimit) {
    --- End diff --
    
    A more important thing I would count and log here is how many times it has already tried to flush within a single call of `enforceQueueLimit()`. We can set a threshold, say 10 times, and then log a message saying that KPL is leading to backpressure


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma closed the pull request at:

    https://github.com/apache/flink/pull/6021


---

[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/6021
  
    Thanks @fmthoma, will proceed to merge this ..


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197063764
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void enforceQueueLimit() {
    --- End diff --
    
    A user-provided queue limit supplier function sounds like a good idea.
    As you mentioned, this can come as a follow-up PR.


---

[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/6021
  
    @fmthoma I think this might benefit from an actual documentation, not only Javadocs.


---

[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on the issue:

    https://github.com/apache/flink/pull/6021
  
    @tzulitai @bowenli86 I've made some more changes while investigating awslabs/amazon-kinesis-producer#183:
    
    * I've followed your suggestion and used `wait()` instead of `Thread.sleep()`, see `TimeoutLatch`. This allows much smaller queue sizes.
    * The timeout of 500ms is too high, i've lowered it to 100ms.
    * I've added two metrics: A `Gauge` for the `outstandingRecordsCount`, and a `Counter` for the backpressure cycles (i.e. the number of times the check `outstandingRecordsCount <= queueLimit` fails).
    
    I updated the documentation, the recommended queue limit is now much lower.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r190154347
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void checkQueueLimit() {
    +		while (producer.getOutstandingRecordsCount() >= queueLimit) {
    +			producer.flush();
    --- End diff --
    
    @tzulitai @bowenli86 I've given this some more thought. `wait()`/`notify()` requires a `synchronized` block. So if we just notify some lock in the callback, this would lead to synchronization overhead. We'd have to recognize a transition from »queue size > queue limit« to »queue size <= queue limit« and only synchronize then, which adds a lot of complexity.
    
    On the other hand: Kinesis accepts up to 1MB per second per shard. The queue limit should be chosen so that some data can be accumulated still before sending, i.e. more than a second of data (more than 1MB per shard). If the queue limit is chosen adequately, then the `Thread.sleep(500)` does not harm, as the queued records take more than one second to flush anyway. If the queue limit is chosen too low, then sleeping half a second may be too long, but we would not reach maximum throughput anyway because of the limitation on the number of `Put` requests.
    
    I think it's not worth the additional complexity.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r192837000
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -180,9 +204,16 @@ public void open(Configuration parameters) throws Exception {
     		KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
     
     		producer = getKinesisProducer(producerConfig);
    +
    +		final MetricGroup kinesisMectricGroup = getRuntimeContext().getMetricGroup().addGroup("kinesisProducer");
    --- End diff --
    
    minor: better to make these three strings constant (static final String) for easier maintenance.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by gliu6 <gi...@git.apache.org>.
Github user gliu6 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r196952063
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void enforceQueueLimit() {
    --- End diff --
    
    Btw, I am not requesting any changes, pr looks good to me for it defined purpose.
    
    Just wonder how to config easily. Now I think about this a bit more. Will it be better if we expose `queue size` to the user instead  of `queue limit (number)`, thus, Inside of the FKP class, define an integer recordSize, and inside of the invoke function, do a moving average calculation of the recordSize with `serialized.remaining()` dynamically. 


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197067733
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void enforceQueueLimit() {
    +		int attempt = 0;
    +		while (producer.getOutstandingRecordsCount() >= queueLimit) {
    +			backpressureCycles.inc();
    +			if (attempt >= 10) {
    +				LOG.warn("Waiting for the queue length to drop below the limit takes unusually long, still not done after {} attempts.", attempt);
    +			}
    +			attempt++;
    +			try {
    +				backpressureLatch.await(100);
    --- End diff --
    
    We might want to make the wait time configurable? (as a separate PR)
    My reasoning is that it directly affects how long until the "flush taking unusually long" message starts popping up.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r189164871
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void checkQueueLimit() {
    +		while (producer.getOutstandingRecordsCount() >= queueLimit) {
    +			producer.flush();
    --- End diff --
    
    Do we have to do a flush here? Shouldn't the KPL child process process the user records in the background without an explicit flush call?
    
    If so, perhaps a more graceful solution here is to wait on a local object, and notify it to wake up in the asynchronous producer write call backs. After being notified, we check the `getOutstandingRecordsCount` agains the queueLimit, and either wait more or escape the loop.
    
    What do you think?


---

[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on the issue:

    https://github.com/apache/flink/pull/6021
  
    @tzulitai I agree on adding additional docs, where do you suggest I should put them? In the Javadoc on `setQueueLimit()`?
    
    My current suggestion is to look at the size of your individual records, and choose the queue limit so that about 10MB per shard are aggregated. 1MB would be too small (since the KPL aggregates the user records to 1MB batches). But I'll run some more performance tests, in particular also with the `wait()`/`notify()` change you suggested above.


---

[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/6021
  
    @fmthoma yes, that would be great.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r189175770
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void checkQueueLimit() {
    +		while (producer.getOutstandingRecordsCount() >= queueLimit) {
    +			producer.flush();
    --- End diff --
    
    Iooks like [KinesisProducer](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java) doesn't have a way to get child process's callback. Or maybe I misunderstood your proposal, Gordon? 


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r189432920
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void checkQueueLimit() {
    +		while (producer.getOutstandingRecordsCount() >= queueLimit) {
    --- End diff --
    
    You mean, log a message if we have checked more than 10 times (5 seconds) for one record? That makes sense. But we shouldn't log every time we reach the threshold, that would lead to log spam.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r189432726
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void checkQueueLimit() {
    +		while (producer.getOutstandingRecordsCount() >= queueLimit) {
    +			producer.flush();
    --- End diff --
    
    > Do we have to do a flush here? Shouldn't the KPL child process process the user records in the background without an explicit flush call?
    
    I don't know for sure, but here's what I think: The KPL aggregates records into batches of 1MB before sending them out, in order to achieve maximum throughput. If we reach the queue limit before 1MB batch is full, the KPL may wait for some time before sending the aggregated record anyway. The `flush()` should trigger that immediately.
    
    Also, according to the Javadocs on `flush()`:
    
    ```
        /**
         * Instruct the child process to perform a flush, sending some of the
         * records it has buffered. Applies to all streams.
         * 
         * <p>
         * This does not guarantee that all buffered records will be sent, only that
         * most of them will; to flush all records and wait for completion, use
         * {@link #flushSync}.
         * 
         * <p>
         * This method returns immediately without blocking.
    ```
    
    So I think that `flush()` is still the right thing to do, although it might make sense to reduce the wait time. `notify()`ing a lock in the callback instead of waiting a fixed time might make more sense nevertheless, I will look into that.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r189163394
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -218,6 +232,8 @@ public void invoke(OUT value, Context context) throws Exception {
     			throw new RuntimeException("Kinesis producer has been closed");
     		}
     
    +		checkAndPropagateAsyncError();
    +		checkQueueLimit();
     		checkAndPropagateAsyncError();
    --- End diff --
    
    This second check is to check any async errors that occurred during the queue flush, correct?
    If so, we should probably move this second invocation into `checkQueueLimit` to make this more implicit.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197142648
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -144,6 +163,17 @@ public void setFailOnError(boolean failOnError) {
     		this.failOnError = failOnError;
     	}
     
    +	/**
    +	 * The {@link KinesisProducer} holds an unbounded queue internally. To avoid memory
    +	 * problems under high loads, a limit can be employed above which the internal queue
    +	 * will be flushed, thereby applying backpressure.
    +	 *
    +	 * @param queueLimit The maximum length of the internal queue before backpressuring
    +	 */
    +	public void setQueueLimit(int queueLimit) {
    +		this.queueLimit = queueLimit;
    --- End diff --
    
    ✔ (`queueLimit > 0`)


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197143428
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void enforceQueueLimit() {
    --- End diff --
    
    ✔


---

[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on the issue:

    https://github.com/apache/flink/pull/6021
  
    @tzulitai Thanks for your last review comments! I addressed them, and rebased the branch against master.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r192861304
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -180,9 +204,16 @@ public void open(Configuration parameters) throws Exception {
     		KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
     
     		producer = getKinesisProducer(producerConfig);
    +
    +		final MetricGroup kinesisMectricGroup = getRuntimeContext().getMetricGroup().addGroup("kinesisProducer");
    --- End diff --
    
    Sure.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r189176320
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void checkQueueLimit() {
    +		while (producer.getOutstandingRecordsCount() >= queueLimit) {
    +			producer.flush();
    +			try {
    +				Thread.sleep(500);
    +			} catch (InterruptedException e) {
    +				LOG.warn("Flushing was interrupted.");
    --- End diff --
    
    you can remove this two lines, they don't provide much value. After removal, it will be almost exactly how `KinesisProducer#flushSync` works
    
    ```
    // KinesisProducer.java
    @Override
        public void flushSync() {
            while (getOutstandingRecordsCount() > 0) {
                flush();
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) { }
            }
        }
    ```


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197067136
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void enforceQueueLimit() {
    +		int attempt = 0;
    +		while (producer.getOutstandingRecordsCount() >= queueLimit) {
    +			backpressureCycles.inc();
    +			if (attempt >= 10) {
    +				LOG.warn("Waiting for the queue length to drop below the limit takes unusually long, still not done after {} attempts.", attempt);
    +			}
    +			attempt++;
    +			try {
    +				backpressureLatch.await(100);
    --- End diff --
    
    I like this implementation a lot better now 👍 


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197064931
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void enforceQueueLimit() {
    --- End diff --
    
    The moving average calculation, that you described, could maybe just be a implementation of the limit supplier function.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r192834879
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kinesis.util;
    +
    +public class TimeoutLatch {
    +
    +	private final Object lock = new Object();
    +	private volatile boolean waiting;
    +
    +	public void await(long timeout) throws InterruptedException {
    +		synchronized (lock) {
    +			waiting = true;
    +			lock.wait(timeout);
    +		}
    +	}
    +
    +	public void trigger() {
    +		if (waiting) {
    +			synchronized (lock) {
    +				waiting = false;
    --- End diff --
    
    needs another `if (waiting)` here inside the synchronized block, to ensure no one chimes in between line 34 and 35


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197141591
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -55,6 +58,13 @@
     @PublicEvolving
     public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
     
    +	public static final String KINESIS_PRODUCER_METRIC_GROUP = "kinesisProducer";
    +
    +	public static final String METRIC_BACKPRESSURE_CYCLES = "backpressureCycles";
    +
    +	public static final String METRIC_OUTSTANDING_RECORDS_COUNT = "outstandingRecordsCount";
    +
    +
    --- End diff --
    
    ✔


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r192861127
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kinesis.util;
    +
    +public class TimeoutLatch {
    +
    +	private final Object lock = new Object();
    +	private volatile boolean waiting;
    +
    +	public void await(long timeout) throws InterruptedException {
    +		synchronized (lock) {
    +			waiting = true;
    +			lock.wait(timeout);
    +		}
    +	}
    +
    +	public void trigger() {
    +		if (waiting) {
    +			synchronized (lock) {
    +				waiting = false;
    --- End diff --
    
    Why? I don't think a double-check lock is necessary here: There is no harm in setting a variable to `false` that is already `false`, and neither in `notify`ing a lock for which nobody is `wait`ing. But sure, it wouldn't harm, either. Do you insist?


---

[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/6021
  
    @tzulitai adding docs to educate users on tuning KPL performance would be good. I has quite some experience on it (as you may have know :)  Ping me if you start working on it before I do, and I'll be glad to help contribute


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by gliu6 <gi...@git.apache.org>.
Github user gliu6 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r196940135
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void enforceQueueLimit() {
    --- End diff --
    
    I wonder whether we could adjust the queue limit dynamically. 
    you mentioned that `queue limit = (number of shards * queue size per shard) / record size`.
    except record size, all others are relatively easy to set. For me, I don't really know the record size until the application starts. Also, what is the record size varies over time?
    So how about add a queueLimit supplier function here to allow user to supply how the queueLimit is calculated dynamically? 



---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197137205
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java ---
    @@ -267,6 +268,79 @@ public void go() throws Exception {
     		testHarness.close();
     	}
     
    +	/**
    +	 * Test ensuring that the producer blocks if the queue limit is exceeded,
    +	 * until the queue length drops below the limit;
    +	 * we set a timeout because the test will not finish if the logic is broken.
    +	 */
    +	@Test(timeout = 10000)
    +	public void testBackpressure() throws Throwable {
    +		final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
    +		producer.setQueueLimit(1);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +				new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
    +
    +		testHarness.open();
    +
    +		UserRecordResult result = mock(UserRecordResult.class);
    +		when(result.isSuccessful()).thenReturn(true);
    +
    +		CheckedThread msg1 = new CheckedThread() {
    +			@Override
    +			public void go() throws Exception {
    +				testHarness.processElement(new StreamRecord<>("msg-1"));
    +			}
    +		};
    +		msg1.start();
    +		msg1.trySync(100);
    +		assertFalse("Flush triggered before reaching queue limit", msg1.isAlive());
    --- End diff --
    
    @tzulitai In principle, yes, if the call `testHarness.processElement(…)` takes more than 100 milliseconds. However, I believe this is very unlikely even on slow systems, since the operation is mostly (entirely?) CPU bound. If test failures occur nevertheless, it should be no problem to increase the timeout for `msg1` and `msg2`.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197070282
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java ---
    @@ -267,6 +268,79 @@ public void go() throws Exception {
     		testHarness.close();
     	}
     
    +	/**
    +	 * Test ensuring that the producer blocks if the queue limit is exceeded,
    +	 * until the queue length drops below the limit;
    +	 * we set a timeout because the test will not finish if the logic is broken.
    +	 */
    +	@Test(timeout = 10000)
    +	public void testBackpressure() throws Throwable {
    +		final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
    +		producer.setQueueLimit(1);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +				new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
    +
    +		testHarness.open();
    +
    +		UserRecordResult result = mock(UserRecordResult.class);
    +		when(result.isSuccessful()).thenReturn(true);
    +
    +		CheckedThread msg1 = new CheckedThread() {
    +			@Override
    +			public void go() throws Exception {
    +				testHarness.processElement(new StreamRecord<>("msg-1"));
    +			}
    +		};
    +		msg1.start();
    +		msg1.trySync(100);
    +		assertFalse("Flush triggered before reaching queue limit", msg1.isAlive());
    --- End diff --
    
    I wonder if this would introduce flakiness in the test.
    @fmthoma could you elaborate a bit here?


---

[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on the issue:

    https://github.com/apache/flink/pull/6021
  
    Merged manually by squashing: 7d034d4


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r189432794
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void checkQueueLimit() {
    +		while (producer.getOutstandingRecordsCount() >= queueLimit) {
    +			producer.flush();
    +			try {
    +				Thread.sleep(500);
    +			} catch (InterruptedException e) {
    +				LOG.warn("Flushing was interrupted.");
    --- End diff --
    
    I don't think so, `flushSync()` will just swallow the interrupt and block again until the queue is empty. `checkQueueLimit()` OTOH aborts immediately on the first interrupt. So there is a difference, although we could of course discuss which one makes more sense.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197065961
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -144,6 +163,17 @@ public void setFailOnError(boolean failOnError) {
     		this.failOnError = failOnError;
     	}
     
    +	/**
    +	 * The {@link KinesisProducer} holds an unbounded queue internally. To avoid memory
    +	 * problems under high loads, a limit can be employed above which the internal queue
    +	 * will be flushed, thereby applying backpressure.
    +	 *
    +	 * @param queueLimit The maximum length of the internal queue before backpressuring
    +	 */
    +	public void setQueueLimit(int queueLimit) {
    +		this.queueLimit = queueLimit;
    --- End diff --
    
    Will need argument checks on the given `queueLimit`.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197143312
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kinesis.util;
    +
    +public class TimeoutLatch {
    --- End diff --
    
    ✔


---

[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on the issue:

    https://github.com/apache/flink/pull/6021
  
    @tzulitai I believe the right location is `docs/dev/connectors/kinesis.md`? I'll add some docs there.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197069244
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kinesis.util;
    +
    +public class TimeoutLatch {
    +
    +	private final Object lock = new Object();
    +	private volatile boolean waiting;
    +
    +	public void await(long timeout) throws InterruptedException {
    +		synchronized (lock) {
    +			waiting = true;
    +			lock.wait(timeout);
    +		}
    +	}
    +
    +	public void trigger() {
    +		if (waiting) {
    +			synchronized (lock) {
    +				waiting = false;
    --- End diff --
    
    I agree with @fmthoma here.


---

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197143254
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---
    @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception {
     		}
     	}
     
    +	/**
    +	 * If the internal queue of the {@link KinesisProducer} gets too long,
    +	 * flush some of the records until we are below the limit again.
    +	 * We don't want to flush _all_ records at this point since that would
    +	 * break record aggregation.
    +	 */
    +	private void enforceQueueLimit() {
    +		int attempt = 0;
    +		while (producer.getOutstandingRecordsCount() >= queueLimit) {
    +			backpressureCycles.inc();
    +			if (attempt >= 10) {
    +				LOG.warn("Waiting for the queue length to drop below the limit takes unusually long, still not done after {} attempts.", attempt);
    +			}
    +			attempt++;
    +			try {
    +				backpressureLatch.await(100);
    --- End diff --
    
    It does, but if we make it configurable, I'd rather keep the warning threshold at one second, i.e. `if (attempt >= 1000 / waitTime) { … }`.


---