You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by th...@apache.org on 2019/07/22 16:43:06 UTC

[flink] branch release-1.9 updated: [FLINK-12595][kinesis] Interrupt thread at right time to avoid deadlock

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 7ea55e9  [FLINK-12595][kinesis] Interrupt thread at right time to avoid deadlock
7ea55e9 is described below

commit 7ea55e967bc450b3b744edcaea23834646e439cd
Author: Shannon Carey <re...@gmail.com>
AuthorDate: Sat Jul 20 14:15:50 2019 -0500

    [FLINK-12595][kinesis] Interrupt thread at right time to avoid deadlock
    
    - Inside testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown,
    consumerThread.interrupt() was getting absorbed inside
    KinesisDataFetcher's while(running) loop, therefore
    TestableKinesisDataFetcherForShardConsumerException's awaitTermination()
    wasn't getting interrupted by it. This led to deadlock, with
    KinesisDataFetcher waiting on the test code to send the interrupt, and
    the test code waiting for KinesisDataFetcher to throw the expected
    exception.
    - Now, the test code waits until KinesisDataFetcher is inside
    awaitTermination() before producing the interrupt, so it can be sure
    that the interrupt it produces will be received/handled inside
    awaitTermination().
---
 .../kinesis/internals/KinesisDataFetcherTest.java         |  6 +++++-
 ...stableKinesisDataFetcherForShardConsumerException.java | 15 ++++++++++++++-
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 5255e61..2815193 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -846,7 +846,7 @@ public class KinesisDataFetcherTest extends TestLogger {
 		DummyFlinkKinesisConsumer<String> consumer = new DummyFlinkKinesisConsumer<>(
 			TestUtils.getStandardProperties(), fetcher, 1, 0);
 
-		CheckedThread consumerThread = new CheckedThread() {
+		CheckedThread consumerThread = new CheckedThread("FlinkKinesisConsumer") {
 			@Override
 			public void go() throws Exception {
 				consumer.run(new TestSourceContext<>());
@@ -858,6 +858,10 @@ public class KinesisDataFetcherTest extends TestLogger {
 		// ShardConsumer exception (from deserializer) will result in fetcher being shut down.
 		fetcher.waitUntilShutdown(20, TimeUnit.SECONDS);
 
+		// Ensure that KinesisDataFetcher has exited its while(running) loop and is inside its awaitTermination()
+		// method before we interrupt its thread, so that our interrupt doesn't get absorbed by any other mechanism.
+		fetcher.waitUntilAwaitTermination(20, TimeUnit.SECONDS);
+
 		// Interrupt the thread so that KinesisDataFetcher#awaitTermination() will throw InterruptedException.
 		consumerThread.interrupt();
 
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
index c08b7af..6ae4391 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kinesis.testutils;
 
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -32,6 +33,8 @@ import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -39,6 +42,10 @@ import java.util.concurrent.atomic.AtomicReference;
  * {@link #awaitTermination()}.
  */
 public class TestableKinesisDataFetcherForShardConsumerException<T> extends TestableKinesisDataFetcher<T> {
+	public volatile boolean wasInterrupted = false;
+
+	private OneShotLatch awaitTerminationWaiter = new OneShotLatch();
+
 	public TestableKinesisDataFetcherForShardConsumerException(final List<String> fakeStreams,
 			final SourceFunction.SourceContext<T> sourceContext,
 			final Properties fakeConfiguration,
@@ -54,7 +61,12 @@ public class TestableKinesisDataFetcherForShardConsumerException<T> extends Test
 			subscribedStreamsToLastDiscoveredShardIdsStateUnderTest, fakeKinesis);
 	}
 
-	public volatile boolean wasInterrupted = false;
+	/**
+	 * Block until awaitTermination() has been called on this class.
+	 */
+	public void waitUntilAwaitTermination(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
+		awaitTerminationWaiter.await(timeout, timeUnit);
+	}
 
 	@Override
 	protected ExecutorService createShardConsumersThreadPool(final String subtaskName) {
@@ -65,6 +77,7 @@ public class TestableKinesisDataFetcherForShardConsumerException<T> extends Test
 
 	@Override
 	public void awaitTermination() throws InterruptedException {
+		awaitTerminationWaiter.trigger();
 		try {
 			// Force this method to only exit by thread getting interrupted.
 			while (true) {