You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/06/22 09:21:59 UTC

[6/6] flink git commit: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer backpressuring

[FLINK-9374] [kinesis] Enable FlinkKinesisProducer backpressuring

This closes #9374.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b725982e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b725982e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b725982e

Branch: refs/heads/release-1.5
Commit: b725982e5758043ba3aa53bde1615569336e451e
Parents: 2c1e3f0
Author: Franz Thoma <fr...@tngtech.com>
Authored: Wed May 9 08:27:47 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 11:21:19 2018 +0200

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                  | 30 +++++++
 .../kinesis/FlinkKinesisProducer.java           | 70 ++++++++++++++-
 .../connectors/kinesis/util/TimeoutLatch.java   | 44 +++++++++
 .../kinesis/FlinkKinesisProducerTest.java       | 95 +++++++++++++++-----
 .../flink/core/testutils/CheckedThread.java     | 13 ++-
 .../flink/core/testutils/MultiShotLatch.java    |  9 ++
 6 files changed, 239 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b725982e/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 7551142..03224a1 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -344,6 +344,36 @@ Since Flink 1.4.0, `FlinkKinesisProducer` switches its default underlying KPL fr
 
 Users can still switch back to one-thread-per-request mode by setting a key-value pair of `ThreadingModel` and `PER_REQUEST` in `java.util.Properties`, as shown in the code commented out in above example.
 
+### Backpressure
+
+By default, `FlinkKinesisProducer` does not backpressure. Instead, records that
+cannot be sent because of the rate restriction of 1 MB per second per shard are
+buffered in an unbounded queue and dropped when their `RecordTtl` expires.
+
+To avoid data loss, you can enable backpressuring by restricting the size of the
+internal queue:
+
+```
+// 200 Bytes per record, 1 shard
+kinesis.setQueueLimit(500);
+```
+
+The value for `queueLimit` depends on the expected record size. To choose a good
+value, consider that Kinesis is rate-limited to 1MB per second per shard. If
+less than one second's worth of records is buffered, then the queue may not be
+able to operate at full capacity. With the default `RecordMaxBufferedTime` of
+100ms, a queue size of 100kB per shard should be sufficient. The `queueLimit`
+can then be computed via
+
+```
+queue limit = (number of shards * queue size per shard) / record size
+```
+
+E.g. for 200Bytes per record and 8 shards, a queue limit of 4000 is a good
+starting point. If the queue size limits throughput (below 1MB per second per
+shard), try increasing the queue limit slightly.
+
+
 ## Using Non-AWS Kinesis Endpoints for Testing
 
 It is sometimes desirable to have Flink operate as a consumer or producer against a non-AWS Kinesis endpoint such as

http://git-wip-us.apache.org/repos/asf/flink/blob/b725982e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index a9b48ae..b086ac1 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -21,12 +21,15 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.TimeoutLatch;
 import org.apache.flink.util.InstantiationUtil;
 
 import com.amazonaws.services.kinesis.producer.Attempt;
@@ -55,6 +58,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @PublicEvolving
 public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
 
+	public static final String KINESIS_PRODUCER_METRIC_GROUP = "kinesisProducer";
+
+	public static final String METRIC_BACKPRESSURE_CYCLES = "backpressureCycles";
+
+	public static final String METRIC_OUTSTANDING_RECORDS_COUNT = "outstandingRecordsCount";
+
 	private static final long serialVersionUID = 6447077318449477846L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class);
@@ -65,6 +74,9 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 	/* Flag controlling the error behavior of the producer */
 	private boolean failOnError = false;
 
+	/* Maximum length of the internal record queue before backpressuring */
+	private int queueLimit = Integer.MAX_VALUE;
+
 	/* Name of the default stream to produce to. Can be overwritten by the serialization schema */
 	private String defaultStream;
 
@@ -82,9 +94,15 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 	/* Our Kinesis instance for each parallel Flink sink */
 	private transient KinesisProducer producer;
 
+	/* Backpressuring waits for this latch, triggered by record callback */
+	private transient TimeoutLatch backpressureLatch;
+
 	/* Callback handling failures */
 	private transient FutureCallback<UserRecordResult> callback;
 
+	/* Counts how often we have to wait for KPL because we are above the queue limit */
+	private transient Counter backpressureCycles;
+
 	/* Field for async exception */
 	private transient volatile Throwable thrownException;
 
@@ -145,6 +163,18 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 	}
 
 	/**
+	 * The {@link KinesisProducer} holds an unbounded queue internally. To avoid memory
+	 * problems under high loads, a limit can be employed above which the internal queue
+	 * will be flushed, thereby applying backpressure.
+	 *
+	 * @param queueLimit The maximum length of the internal queue before backpressuring
+	 */
+	public void setQueueLimit(int queueLimit) {
+		checkArgument(queueLimit > 0, "queueLimit must be a positive number");
+		this.queueLimit = queueLimit;
+	}
+
+	/**
 	 * Set a default stream name.
 	 * @param defaultStream Name of the default Kinesis stream
 	 */
@@ -180,9 +210,16 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 		KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
 
 		producer = getKinesisProducer(producerConfig);
+
+		final MetricGroup kinesisMectricGroup = getRuntimeContext().getMetricGroup().addGroup(KINESIS_PRODUCER_METRIC_GROUP);
+		this.backpressureCycles = kinesisMectricGroup.counter(METRIC_BACKPRESSURE_CYCLES);
+		kinesisMectricGroup.gauge(METRIC_OUTSTANDING_RECORDS_COUNT, producer::getOutstandingRecordsCount);
+
+		backpressureLatch = new TimeoutLatch();
 		callback = new FutureCallback<UserRecordResult>() {
 			@Override
 			public void onSuccess(UserRecordResult result) {
+				backpressureLatch.trigger();
 				if (!result.isSuccessful()) {
 					if (failOnError) {
 						// only remember the first thrown exception
@@ -197,6 +234,7 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 
 			@Override
 			public void onFailure(Throwable t) {
+				backpressureLatch.trigger();
 				if (failOnError) {
 					thrownException = t;
 				} else {
@@ -219,6 +257,11 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 		}
 
 		checkAndPropagateAsyncError();
+		boolean didWaitForFlush = enforceQueueLimit();
+
+		if (didWaitForFlush) {
+			checkAndPropagateAsyncError();
+		}
 
 		String stream = defaultStream;
 		String partition = defaultPartition;
@@ -327,6 +370,32 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 	}
 
 	/**
+	 * If the internal queue of the {@link KinesisProducer} gets too long,
+	 * flush some of the records until we are below the limit again.
+	 * We don't want to flush _all_ records at this point since that would
+	 * break record aggregation.
+	 *
+	 * @return boolean whether flushing occurred or not
+	 */
+	private boolean enforceQueueLimit() {
+		int attempt = 0;
+		while (producer.getOutstandingRecordsCount() >= queueLimit) {
+			backpressureCycles.inc();
+			if (attempt >= 10) {
+				LOG.warn("Waiting for the queue length to drop below the limit takes unusually long, still not done after {} attempts.", attempt);
+			}
+			attempt++;
+			try {
+				backpressureLatch.await(100);
+			} catch (InterruptedException e) {
+				LOG.warn("Flushing was interrupted.");
+				break;
+			}
+		}
+		return attempt > 0;
+	}
+
+	/**
 	 * A reimplementation of {@link KinesisProducer#flushSync()}.
 	 * This implementation releases the block on flushing if an interruption occurred.
 	 */
@@ -337,7 +406,6 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 				Thread.sleep(500);
 			} catch (InterruptedException e) {
 				LOG.warn("Flushing was interrupted.");
-
 				break;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b725982e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
new file mode 100644
index 0000000..4dcab33
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.annotation.Internal;
+
+@Internal
+public class TimeoutLatch {
+
+	private final Object lock = new Object();
+	private volatile boolean waiting;
+
+	public void await(long timeout) throws InterruptedException {
+		synchronized (lock) {
+			waiting = true;
+			lock.wait(timeout);
+		}
+	}
+
+	public void trigger() {
+		if (waiting) {
+			synchronized (lock) {
+				waiting = false;
+				lock.notifyAll();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b725982e/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
index 86cefff..6355cf5 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
@@ -46,6 +46,7 @@ import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.List;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
@@ -267,6 +268,79 @@ public class FlinkKinesisProducerTest {
 		testHarness.close();
 	}
 
+	/**
+	 * Test ensuring that the producer blocks if the queue limit is exceeded,
+	 * until the queue length drops below the limit;
+	 * we set a timeout because the test will not finish if the logic is broken.
+	 */
+	@Test(timeout = 10000)
+	public void testBackpressure() throws Throwable {
+		final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+		producer.setQueueLimit(1);
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness =
+				new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+		testHarness.open();
+
+		UserRecordResult result = mock(UserRecordResult.class);
+		when(result.isSuccessful()).thenReturn(true);
+
+		CheckedThread msg1 = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				testHarness.processElement(new StreamRecord<>("msg-1"));
+			}
+		};
+		msg1.start();
+		msg1.trySync(100);
+		assertFalse("Flush triggered before reaching queue limit", msg1.isAlive());
+
+		// consume msg-1 so that queue is empty again
+		producer.getPendingRecordFutures().get(0).set(result);
+
+		CheckedThread msg2 = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				testHarness.processElement(new StreamRecord<>("msg-2"));
+			}
+		};
+		msg2.start();
+		msg2.trySync(100);
+		assertFalse("Flush triggered before reaching queue limit", msg2.isAlive());
+
+		CheckedThread moreElementsThread = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				// this should block until msg-2 is consumed
+				testHarness.processElement(new StreamRecord<>("msg-3"));
+				// this should block until msg-3 is consumed
+				testHarness.processElement(new StreamRecord<>("msg-4"));
+			}
+		};
+		moreElementsThread.start();
+
+		moreElementsThread.trySync(100);
+		assertTrue("Producer should still block, but doesn't", moreElementsThread.isAlive());
+
+		// consume msg-2 from the queue, leaving msg-3 in the queue and msg-4 blocked
+		producer.getPendingRecordFutures().get(1).set(result);
+
+		moreElementsThread.trySync(100);
+		assertTrue("Producer should still block, but doesn't", moreElementsThread.isAlive());
+
+		// consume msg-3, blocked msg-4 can be inserted into the queue and block is released
+		producer.getPendingRecordFutures().get(2).set(result);
+
+		moreElementsThread.trySync(100);
+
+		assertFalse("Prodcuer still blocks although the queue is flushed", moreElementsThread.isAlive());
+
+		producer.getPendingRecordFutures().get(3).set(result);
+
+		testHarness.close();
+	}
+
 	// ----------------------------------------------------------------------
 	// Utility test classes
 	// ----------------------------------------------------------------------
@@ -346,7 +420,6 @@ public class FlinkKinesisProducerTest {
 		private List<SettableFuture<UserRecordResult>> pendingRecordFutures = new LinkedList<>();
 
 		private transient MultiShotLatch flushLatch;
-		private boolean isFlushed;
 
 		DummyFlinkKinesisProducer(SerializationSchema<T> schema) {
 			super(schema, TestUtils.getStandardProperties());
@@ -378,13 +451,6 @@ public class FlinkKinesisProducerTest {
 				@Override
 				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
 					flushLatch.trigger();
-
-					while (!isAllRecordFuturesCompleted()) {
-						Thread.sleep(50);
-					}
-
-					isFlushed = true;
-
 					return null;
 				}
 			}).when(mockProducer).flush();
@@ -399,12 +465,11 @@ public class FlinkKinesisProducerTest {
 
 		@Override
 		public void snapshotState(FunctionSnapshotContext context) throws Exception {
-			isFlushed = false;
 
 			super.snapshotState(context);
 
 			// if the snapshot implementation doesn't wait until all pending records are flushed, we should fail the test
-			if (!isFlushed) {
+			if (mockProducer.getOutstandingRecordsCount() > 0) {
 				throw new RuntimeException("Flushing is enabled; snapshots should be blocked until all pending records are flushed");
 			}
 		}
@@ -417,16 +482,6 @@ public class FlinkKinesisProducerTest {
 			flushLatch.await();
 		}
 
-		private boolean isAllRecordFuturesCompleted() {
-			for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
-				if (!future.isDone()) {
-					return false;
-				}
-			}
-
-			return true;
-		}
-
 		private int getNumPendingRecordFutures() {
 			int numPending = 0;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b725982e/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
index f2647cc..7aa177a 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
@@ -97,8 +97,19 @@ public abstract class CheckedThread extends Thread {
 	 * exceptions thrown from the {@link #go()} method.
 	 */
 	public void sync(long timeout) throws Exception {
-		join(timeout);
+		trySync(timeout);
 		checkFinished();
+	}
+
+	/**
+	 * Waits with timeout until the thread is completed and checks whether any error
+	 * occurred during the execution.
+	 *
+	 * <p>This method blocks like {@link #join()}, but performs an additional check for
+	 * exceptions thrown from the {@link #go()} method.
+	 */
+	public void trySync(long timeout) throws Exception {
+		join(timeout);
 		checkError();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b725982e/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
index 69f73eb..861ea31 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
@@ -52,4 +52,13 @@ public final class MultiShotLatch {
 			triggered = false;
 		}
 	}
+
+	/**
+	 * Checks if the latch was triggered.
+	 *
+	 * @return True, if the latch was triggered, false if not.
+	 */
+	public boolean isTriggered() {
+		return triggered;
+	}
 }