You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/06/22 07:31:39 UTC
[7/7] flink git commit: [FLINK-9374] [kinesis] Enable
FlinkKinesisProducer backpressuring
[FLINK-9374] [kinesis] Enable FlinkKinesisProducer backpressuring
This closes #9374.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d034d4e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d034d4e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d034d4e
Branch: refs/heads/master
Commit: 7d034d4ef6986ba5ccda6f5e8c587b8fdd88be8e
Parents: f1f79ee
Author: Franz Thoma <fr...@tngtech.com>
Authored: Wed May 9 08:27:47 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 09:30:58 2018 +0200
----------------------------------------------------------------------
docs/dev/connectors/kinesis.md | 30 +++++++
.../kinesis/FlinkKinesisProducer.java | 70 ++++++++++++++-
.../connectors/kinesis/util/TimeoutLatch.java | 44 +++++++++
.../kinesis/FlinkKinesisProducerTest.java | 95 +++++++++++++++-----
.../flink/core/testutils/CheckedThread.java | 13 ++-
.../flink/core/testutils/MultiShotLatch.java | 9 ++
6 files changed, 239 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7d034d4e/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 7551142..03224a1 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -344,6 +344,36 @@ Since Flink 1.4.0, `FlinkKinesisProducer` switches its default underlying KPL fr
Users can still switch back to one-thread-per-request mode by setting a key-value pair of `ThreadingModel` and `PER_REQUEST` in `java.util.Properties`, as shown in the code commented out in above example.
+### Backpressure
+
+By default, `FlinkKinesisProducer` does not backpressure. Instead, records that
+cannot be sent because of the rate restriction of 1 MB per second per shard are
+buffered in an unbounded queue and dropped when their `RecordTtl` expires.
+
+To avoid data loss, you can enable backpressuring by restricting the size of the
+internal queue:
+
+```
+// 200 Bytes per record, 1 shard
+kinesis.setQueueLimit(500);
+```
+
+The value for `queueLimit` depends on the expected record size. To choose a good
+value, consider that Kinesis is rate-limited to 1MB per second per shard. If
+less than one second's worth of records is buffered, then the queue may not be
+able to operate at full capacity. With the default `RecordMaxBufferedTime` of
+100ms, a queue size of 100kB per shard should be sufficient. The `queueLimit`
+can then be computed via
+
+```
+queue limit = (number of shards * queue size per shard) / record size
+```
+
+E.g. for 200Bytes per record and 8 shards, a queue limit of 4000 is a good
+starting point. If the queue size limits throughput (below 1MB per second per
+shard), try increasing the queue limit slightly.
+
+
## Using Non-AWS Kinesis Endpoints for Testing
It is sometimes desirable to have Flink operate as a consumer or producer against a non-AWS Kinesis endpoint such as
http://git-wip-us.apache.org/repos/asf/flink/blob/7d034d4e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index a9b48ae..b086ac1 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -21,12 +21,15 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.TimeoutLatch;
import org.apache.flink.util.InstantiationUtil;
import com.amazonaws.services.kinesis.producer.Attempt;
@@ -55,6 +58,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@PublicEvolving
public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
+ public static final String KINESIS_PRODUCER_METRIC_GROUP = "kinesisProducer";
+
+ public static final String METRIC_BACKPRESSURE_CYCLES = "backpressureCycles";
+
+ public static final String METRIC_OUTSTANDING_RECORDS_COUNT = "outstandingRecordsCount";
+
private static final long serialVersionUID = 6447077318449477846L;
private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class);
@@ -65,6 +74,9 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
/* Flag controlling the error behavior of the producer */
private boolean failOnError = false;
+ /* Maximum length of the internal record queue before backpressuring */
+ private int queueLimit = Integer.MAX_VALUE;
+
/* Name of the default stream to produce to. Can be overwritten by the serialization schema */
private String defaultStream;
@@ -82,9 +94,15 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
/* Our Kinesis instance for each parallel Flink sink */
private transient KinesisProducer producer;
+ /* Backpressuring waits for this latch, triggered by record callback */
+ private transient TimeoutLatch backpressureLatch;
+
/* Callback handling failures */
private transient FutureCallback<UserRecordResult> callback;
+ /* Counts how often we have to wait for KPL because we are above the queue limit */
+ private transient Counter backpressureCycles;
+
/* Field for async exception */
private transient volatile Throwable thrownException;
@@ -145,6 +163,18 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
}
/**
+ * The {@link KinesisProducer} holds an unbounded queue internally. To avoid memory
+ * problems under high loads, a limit can be employed above which the internal queue
+ * will be flushed, thereby applying backpressure.
+ *
+ * @param queueLimit The maximum length of the internal queue before backpressuring
+ */
+ public void setQueueLimit(int queueLimit) {
+ checkArgument(queueLimit > 0, "queueLimit must be a positive number");
+ this.queueLimit = queueLimit;
+ }
+
+ /**
* Set a default stream name.
* @param defaultStream Name of the default Kinesis stream
*/
@@ -180,9 +210,16 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
producer = getKinesisProducer(producerConfig);
+
+ final MetricGroup kinesisMectricGroup = getRuntimeContext().getMetricGroup().addGroup(KINESIS_PRODUCER_METRIC_GROUP);
+ this.backpressureCycles = kinesisMectricGroup.counter(METRIC_BACKPRESSURE_CYCLES);
+ kinesisMectricGroup.gauge(METRIC_OUTSTANDING_RECORDS_COUNT, producer::getOutstandingRecordsCount);
+
+ backpressureLatch = new TimeoutLatch();
callback = new FutureCallback<UserRecordResult>() {
@Override
public void onSuccess(UserRecordResult result) {
+ backpressureLatch.trigger();
if (!result.isSuccessful()) {
if (failOnError) {
// only remember the first thrown exception
@@ -197,6 +234,7 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
@Override
public void onFailure(Throwable t) {
+ backpressureLatch.trigger();
if (failOnError) {
thrownException = t;
} else {
@@ -219,6 +257,11 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
}
checkAndPropagateAsyncError();
+ boolean didWaitForFlush = enforceQueueLimit();
+
+ if (didWaitForFlush) {
+ checkAndPropagateAsyncError();
+ }
String stream = defaultStream;
String partition = defaultPartition;
@@ -327,6 +370,32 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
}
/**
+ * If the internal queue of the {@link KinesisProducer} gets too long,
+ * flush some of the records until we are below the limit again.
+ * We don't want to flush _all_ records at this point since that would
+ * break record aggregation.
+ *
+ * @return boolean whether flushing occurred or not
+ */
+ private boolean enforceQueueLimit() {
+ int attempt = 0;
+ while (producer.getOutstandingRecordsCount() >= queueLimit) {
+ backpressureCycles.inc();
+ if (attempt >= 10) {
+ LOG.warn("Waiting for the queue length to drop below the limit takes unusually long, still not done after {} attempts.", attempt);
+ }
+ attempt++;
+ try {
+ backpressureLatch.await(100);
+ } catch (InterruptedException e) {
+ LOG.warn("Flushing was interrupted.");
+ break;
+ }
+ }
+ return attempt > 0;
+ }
+
+ /**
* A reimplementation of {@link KinesisProducer#flushSync()}.
* This implementation releases the block on flushing if an interruption occurred.
*/
@@ -337,7 +406,6 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
Thread.sleep(500);
} catch (InterruptedException e) {
LOG.warn("Flushing was interrupted.");
-
break;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7d034d4e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
new file mode 100644
index 0000000..4dcab33
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.annotation.Internal;
+
+@Internal
+public class TimeoutLatch {
+
+ private final Object lock = new Object();
+ private volatile boolean waiting;
+
+ public void await(long timeout) throws InterruptedException {
+ synchronized (lock) {
+ waiting = true;
+ lock.wait(timeout);
+ }
+ }
+
+ public void trigger() {
+ if (waiting) {
+ synchronized (lock) {
+ waiting = false;
+ lock.notifyAll();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7d034d4e/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
index 86cefff..6355cf5 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
@@ -46,6 +46,7 @@ import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
@@ -267,6 +268,79 @@ public class FlinkKinesisProducerTest {
testHarness.close();
}
+ /**
+ * Test ensuring that the producer blocks if the queue limit is exceeded,
+ * until the queue length drops below the limit;
+ * we set a timeout because the test will not finish if the logic is broken.
+ */
+ @Test(timeout = 10000)
+ public void testBackpressure() throws Throwable {
+ final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+ producer.setQueueLimit(1);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+ testHarness.open();
+
+ UserRecordResult result = mock(UserRecordResult.class);
+ when(result.isSuccessful()).thenReturn(true);
+
+ CheckedThread msg1 = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+ }
+ };
+ msg1.start();
+ msg1.trySync(100);
+ assertFalse("Flush triggered before reaching queue limit", msg1.isAlive());
+
+ // consume msg-1 so that queue is empty again
+ producer.getPendingRecordFutures().get(0).set(result);
+
+ CheckedThread msg2 = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ }
+ };
+ msg2.start();
+ msg2.trySync(100);
+ assertFalse("Flush triggered before reaching queue limit", msg2.isAlive());
+
+ CheckedThread moreElementsThread = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ // this should block until msg-2 is consumed
+ testHarness.processElement(new StreamRecord<>("msg-3"));
+ // this should block until msg-3 is consumed
+ testHarness.processElement(new StreamRecord<>("msg-4"));
+ }
+ };
+ moreElementsThread.start();
+
+ moreElementsThread.trySync(100);
+ assertTrue("Producer should still block, but doesn't", moreElementsThread.isAlive());
+
+ // consume msg-2 from the queue, leaving msg-3 in the queue and msg-4 blocked
+ producer.getPendingRecordFutures().get(1).set(result);
+
+ moreElementsThread.trySync(100);
+ assertTrue("Producer should still block, but doesn't", moreElementsThread.isAlive());
+
+ // consume msg-3, blocked msg-4 can be inserted into the queue and block is released
+ producer.getPendingRecordFutures().get(2).set(result);
+
+ moreElementsThread.trySync(100);
+
+ assertFalse("Prodcuer still blocks although the queue is flushed", moreElementsThread.isAlive());
+
+ producer.getPendingRecordFutures().get(3).set(result);
+
+ testHarness.close();
+ }
+
// ----------------------------------------------------------------------
// Utility test classes
// ----------------------------------------------------------------------
@@ -346,7 +420,6 @@ public class FlinkKinesisProducerTest {
private List<SettableFuture<UserRecordResult>> pendingRecordFutures = new LinkedList<>();
private transient MultiShotLatch flushLatch;
- private boolean isFlushed;
DummyFlinkKinesisProducer(SerializationSchema<T> schema) {
super(schema, TestUtils.getStandardProperties());
@@ -378,13 +451,6 @@ public class FlinkKinesisProducerTest {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
flushLatch.trigger();
-
- while (!isAllRecordFuturesCompleted()) {
- Thread.sleep(50);
- }
-
- isFlushed = true;
-
return null;
}
}).when(mockProducer).flush();
@@ -399,12 +465,11 @@ public class FlinkKinesisProducerTest {
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
- isFlushed = false;
super.snapshotState(context);
// if the snapshot implementation doesn't wait until all pending records are flushed, we should fail the test
- if (!isFlushed) {
+ if (mockProducer.getOutstandingRecordsCount() > 0) {
throw new RuntimeException("Flushing is enabled; snapshots should be blocked until all pending records are flushed");
}
}
@@ -417,16 +482,6 @@ public class FlinkKinesisProducerTest {
flushLatch.await();
}
- private boolean isAllRecordFuturesCompleted() {
- for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
- if (!future.isDone()) {
- return false;
- }
- }
-
- return true;
- }
-
private int getNumPendingRecordFutures() {
int numPending = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/7d034d4e/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
index f2647cc..7aa177a 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
@@ -97,8 +97,19 @@ public abstract class CheckedThread extends Thread {
* exceptions thrown from the {@link #go()} method.
*/
public void sync(long timeout) throws Exception {
- join(timeout);
+ trySync(timeout);
checkFinished();
+ }
+
+ /**
+ * Waits with timeout until the thread is completed and checks whether any error
+ * occurred during the execution.
+ *
+ * <p>This method blocks like {@link #join()}, but performs an additional check for
+ * exceptions thrown from the {@link #go()} method.
+ */
+ public void trySync(long timeout) throws Exception {
+ join(timeout);
checkError();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7d034d4e/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
index 69f73eb..861ea31 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
@@ -52,4 +52,13 @@ public final class MultiShotLatch {
triggered = false;
}
}
+
+ /**
+ * Checks if the latch was triggered.
+ *
+ * @return True, if the latch was triggered, false if not.
+ */
+ public boolean isTriggered() {
+ return triggered;
+ }
}