You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/05/07 15:25:13 UTC
[flink] 02/02: [FLINK-17170][kinesis] Fix deadlock during
stop-with-savepoint.
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 282f9a3d5505a5aa58d7d9cca466939610d41ed3
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue May 4 09:38:30 2021 +0200
[FLINK-17170][kinesis] Fix deadlock during stop-with-savepoint.
During stop-with-savepoint cancel is called under lock in legacy sources. Thus, if the fetcher is trying to emit a record at the same time, it cannot obtain the checkpoint lock. This behavior leads to a deadlock while cancel awaits the termination of the fetcher.
The fix is to mostly rely on the termination inside the run method. As a safe-guard, close also awaits termination where close is always caused without lock.
---
flink-connectors/flink-connector-kinesis/pom.xml | 19 +++
.../connectors/kinesis/FlinkKinesisConsumer.java | 5 +-
.../connectors/kinesis/FlinkKinesisITCase.java | 157 +++++++++++++++++++++
3 files changed, 179 insertions(+), 2 deletions(-)
diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
index 4339ac1..6aae404 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -169,6 +169,13 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>1.15.3</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- Kinesis table factory testing -->
<dependency>
<groupId>org.apache.flink</groupId>
@@ -336,6 +343,18 @@ under the License.
</execution>
</executions>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.22.1</version>
+ <configuration>
+ <systemPropertyVariables>
+ <com.amazonaws.sdk.disableCbor>true</com.amazonaws.sdk.disableCbor>
+ <com.amazonaws.sdk.disableCertChecking>true</com.amazonaws.sdk.disableCertChecking>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 7f30dca..b0a729f 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -401,7 +401,6 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
running = false;
KinesisDataFetcher fetcher = this.fetcher;
- this.fetcher = null;
// this method might be called before the subtask actually starts running,
// so we must check if the fetcher is actually created
@@ -409,7 +408,6 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
try {
// interrupt the fetcher of any work
fetcher.shutdownFetcher();
- fetcher.awaitTermination();
} catch (Exception e) {
LOG.warn("Error while closing Kinesis data fetcher", e);
}
@@ -419,6 +417,9 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
@Override
public void close() throws Exception {
cancel();
+ // safe-guard when the fetcher has been interrupted, make sure to not leak resources
+ fetcher.awaitTermination();
+ this.fetcher = null;
super.close();
}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
new file mode 100644
index 0000000..0b224c3
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesaliteContainer;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertThat;
+
+/** IT cases for using Kinesis consumer/producer based on Kinesalite. */
+public class FlinkKinesisITCase {
+ public static final String TEST_STREAM = "test_stream";
+
+ @ClassRule
+ public static MiniClusterWithClientResource miniCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder().build());
+
+ @ClassRule
+ public static KinesaliteContainer kinesalite =
+ new KinesaliteContainer(
+ DockerImageName.parse("instructure/kinesalite").withTag("latest"));
+
+ @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+ private static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema();
+
+ private KinesisPubsubClient client;
+
+ @Before
+ public void setupClient() {
+ client = new KinesisPubsubClient(kinesalite.getContainerProperties());
+ }
+
+ /**
+ * Tests that pending elements do not cause a deadlock during stop with savepoint (FLINK-17170).
+ *
+ * <ol>
+ * <li>The test setups up a stream with 100 records and creates a Flink job that reads them
+ * with very slowly (using up a large chunk of time of the mailbox).
+ * <li>After ensuring that consumption has started, the job is stopped in a parallel thread.
+ * <li>Without the fix of FLINK-17170, the job now has a high chance to deadlock during
+ * cancel.
+ * <li>With the fix, the job proceeds and we can lift the backpressure.
+ * </ol>
+ */
+ @Test
+ public void testStopWithSavepoint() throws Exception {
+ client.createTopic(TEST_STREAM, 1, new Properties());
+
+ // add elements to the test stream
+ int numElements = 10;
+ List<String> elements =
+ IntStream.range(0, numElements)
+ .mapToObj(String::valueOf)
+ .collect(Collectors.toList());
+ for (String element : elements) {
+ client.sendMessage(TEST_STREAM, element);
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ Properties config = kinesalite.getContainerProperties();
+ config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
+ FlinkKinesisConsumer<String> consumer =
+ new FlinkKinesisConsumer<>(TEST_STREAM, STRING_SCHEMA, config);
+
+ // call stop with savepoint in another thread
+ ForkJoinTask<Object> stopTask =
+ ForkJoinPool.commonPool()
+ .submit(
+ () -> {
+ WaitingMapper.firstElement.await();
+ stopWithSavepoint();
+ WaitingMapper.stopped = true;
+ return null;
+ });
+
+ try {
+ List<String> result =
+ env.addSource(consumer).map(new WaitingMapper()).executeAndCollect(10000);
+ // stop with savepoint will most likely only return a small subset of the elements
+ // validate that the prefix is as expected
+ assertThat(result, hasSize(lessThan(numElements)));
+ assertThat(result, equalTo(elements.subList(0, result.size())));
+ } finally {
+ stopTask.cancel(true);
+ }
+ }
+
+ private String stopWithSavepoint() throws Exception {
+ JobStatusMessage job =
+ miniCluster.getClusterClient().listJobs().get().stream().findFirst().get();
+ return miniCluster
+ .getClusterClient()
+ .stopWithSavepoint(job.getJobId(), true, temp.getRoot().getAbsolutePath())
+ .get();
+ }
+
+ private static class WaitingMapper implements MapFunction<String, String> {
+ static CountDownLatch firstElement = new CountDownLatch(1);
+ static volatile boolean stopped = false;
+
+ @Override
+ public String map(String value) throws Exception {
+ if (firstElement.getCount() > 0) {
+ firstElement.countDown();
+ }
+ if (!stopped) {
+ Thread.sleep(100);
+ }
+ return value;
+ }
+ }
+}