You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/05/07 15:25:13 UTC

[flink] 02/02: [FLINK-17170][kinesis] Fix deadlock during stop-with-savepoint.

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 282f9a3d5505a5aa58d7d9cca466939610d41ed3
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue May 4 09:38:30 2021 +0200

    [FLINK-17170][kinesis] Fix deadlock during stop-with-savepoint.
    
    During stop-with-savepoint cancel is called under lock in legacy sources. Thus, if the fetcher is trying to emit a record at the same time, it cannot obtain the checkpoint lock. This behavior leads to a deadlock while cancel awaits the termination of the fetcher.
    The fix is to mostly rely on the termination inside the run method. As a safe-guard, close also awaits termination where close is always caused without lock.
---
 flink-connectors/flink-connector-kinesis/pom.xml   |  19 +++
 .../connectors/kinesis/FlinkKinesisConsumer.java   |   5 +-
 .../connectors/kinesis/FlinkKinesisITCase.java     | 157 +++++++++++++++++++++
 3 files changed, 179 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
index 4339ac1..6aae404 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -169,6 +169,13 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.testcontainers</groupId>
+			<artifactId>testcontainers</artifactId>
+			<version>1.15.3</version>
+			<scope>test</scope>
+		</dependency>
+
 		<!-- Kinesis table factory testing -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
@@ -336,6 +343,18 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.22.1</version>
+				<configuration>
+					<systemPropertyVariables>
+						<com.amazonaws.sdk.disableCbor>true</com.amazonaws.sdk.disableCbor>
+						<com.amazonaws.sdk.disableCertChecking>true</com.amazonaws.sdk.disableCertChecking>
+					</systemPropertyVariables>
+				</configuration>
+			</plugin>
 		</plugins>
 	</build>
 </project>
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 7f30dca..b0a729f 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -401,7 +401,6 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
         running = false;
 
         KinesisDataFetcher fetcher = this.fetcher;
-        this.fetcher = null;
 
         // this method might be called before the subtask actually starts running,
         // so we must check if the fetcher is actually created
@@ -409,7 +408,6 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
             try {
                 // interrupt the fetcher of any work
                 fetcher.shutdownFetcher();
-                fetcher.awaitTermination();
             } catch (Exception e) {
                 LOG.warn("Error while closing Kinesis data fetcher", e);
             }
@@ -419,6 +417,9 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
     @Override
     public void close() throws Exception {
         cancel();
+        // safe-guard when the fetcher has been interrupted, make sure to not leak resources
+        fetcher.awaitTermination();
+        this.fetcher = null;
         super.close();
     }
 
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
new file mode 100644
index 0000000..0b224c3
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesaliteContainer;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertThat;
+
+/** IT cases for using Kinesis consumer/producer based on Kinesalite. */
+public class FlinkKinesisITCase {
+    public static final String TEST_STREAM = "test_stream";
+
+    @ClassRule
+    public static MiniClusterWithClientResource miniCluster =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder().build());
+
+    @ClassRule
+    public static KinesaliteContainer kinesalite =
+            new KinesaliteContainer(
+                    DockerImageName.parse("instructure/kinesalite").withTag("latest"));
+
+    @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+    private static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema();
+
+    private KinesisPubsubClient client;
+
+    @Before
+    public void setupClient() {
+        client = new KinesisPubsubClient(kinesalite.getContainerProperties());
+    }
+
+    /**
+     * Tests that pending elements do not cause a deadlock during stop with savepoint (FLINK-17170).
+     *
+     * <ol>
+     *   <li>The test setups up a stream with 100 records and creates a Flink job that reads them
+     *       with very slowly (using up a large chunk of time of the mailbox).
+     *   <li>After ensuring that consumption has started, the job is stopped in a parallel thread.
+     *   <li>Without the fix of FLINK-17170, the job now has a high chance to deadlock during
+     *       cancel.
+     *   <li>With the fix, the job proceeds and we can lift the backpressure.
+     * </ol>
+     */
+    @Test
+    public void testStopWithSavepoint() throws Exception {
+        client.createTopic(TEST_STREAM, 1, new Properties());
+
+        // add elements to the test stream
+        int numElements = 10;
+        List<String> elements =
+                IntStream.range(0, numElements)
+                        .mapToObj(String::valueOf)
+                        .collect(Collectors.toList());
+        for (String element : elements) {
+            client.sendMessage(TEST_STREAM, element);
+        }
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        Properties config = kinesalite.getContainerProperties();
+        config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
+        FlinkKinesisConsumer<String> consumer =
+                new FlinkKinesisConsumer<>(TEST_STREAM, STRING_SCHEMA, config);
+
+        // call stop with savepoint in another thread
+        ForkJoinTask<Object> stopTask =
+                ForkJoinPool.commonPool()
+                        .submit(
+                                () -> {
+                                    WaitingMapper.firstElement.await();
+                                    stopWithSavepoint();
+                                    WaitingMapper.stopped = true;
+                                    return null;
+                                });
+
+        try {
+            List<String> result =
+                    env.addSource(consumer).map(new WaitingMapper()).executeAndCollect(10000);
+            // stop with savepoint will most likely only return a small subset of the elements
+            // validate that the prefix is as expected
+            assertThat(result, hasSize(lessThan(numElements)));
+            assertThat(result, equalTo(elements.subList(0, result.size())));
+        } finally {
+            stopTask.cancel(true);
+        }
+    }
+
+    private String stopWithSavepoint() throws Exception {
+        JobStatusMessage job =
+                miniCluster.getClusterClient().listJobs().get().stream().findFirst().get();
+        return miniCluster
+                .getClusterClient()
+                .stopWithSavepoint(job.getJobId(), true, temp.getRoot().getAbsolutePath())
+                .get();
+    }
+
+    private static class WaitingMapper implements MapFunction<String, String> {
+        static CountDownLatch firstElement = new CountDownLatch(1);
+        static volatile boolean stopped = false;
+
+        @Override
+        public String map(String value) throws Exception {
+            if (firstElement.getCount() > 0) {
+                firstElement.countDown();
+            }
+            if (!stopped) {
+                Thread.sleep(100);
+            }
+            return value;
+        }
+    }
+}