You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by th...@apache.org on 2019/07/22 16:43:06 UTC
[flink] branch release-1.9 updated: [FLINK-12595][kinesis]
Interrupt thread at right time to avoid deadlock
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 7ea55e9 [FLINK-12595][kinesis] Interrupt thread at right time to avoid deadlock
7ea55e9 is described below
commit 7ea55e967bc450b3b744edcaea23834646e439cd
Author: Shannon Carey <re...@gmail.com>
AuthorDate: Sat Jul 20 14:15:50 2019 -0500
[FLINK-12595][kinesis] Interrupt thread at right time to avoid deadlock
- Inside testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown,
consumerThread.interrupt() was getting absorbed inside
KinesisDataFetcher's while(running) loop, therefore
TestableKinesisDataFetcherForShardConsumerException's awaitTermination()
wasn't getting interrupted by it. This led to deadlock, with
KinesisDataFetcher waiting on the test code to send the interrupt, and
the test code waiting for KinesisDataFetcher to throw the expected
exception.
- Now, the test code waits until KinesisDataFetcher is inside
awaitTermination() before producing the interrupt, so it can be sure
that the interrupt it produces will be received/handled inside
awaitTermination().
---
.../kinesis/internals/KinesisDataFetcherTest.java | 6 +++++-
...stableKinesisDataFetcherForShardConsumerException.java | 15 ++++++++++++++-
2 files changed, 19 insertions(+), 2 deletions(-)
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 5255e61..2815193 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -846,7 +846,7 @@ public class KinesisDataFetcherTest extends TestLogger {
DummyFlinkKinesisConsumer<String> consumer = new DummyFlinkKinesisConsumer<>(
TestUtils.getStandardProperties(), fetcher, 1, 0);
- CheckedThread consumerThread = new CheckedThread() {
+ CheckedThread consumerThread = new CheckedThread("FlinkKinesisConsumer") {
@Override
public void go() throws Exception {
consumer.run(new TestSourceContext<>());
@@ -858,6 +858,10 @@ public class KinesisDataFetcherTest extends TestLogger {
// ShardConsumer exception (from deserializer) will result in fetcher being shut down.
fetcher.waitUntilShutdown(20, TimeUnit.SECONDS);
+ // Ensure that KinesisDataFetcher has exited its while(running) loop and is inside its awaitTermination()
+ // method before we interrupt its thread, so that our interrupt doesn't get absorbed by any other mechanism.
+ fetcher.waitUntilAwaitTermination(20, TimeUnit.SECONDS);
+
// Interrupt the thread so that KinesisDataFetcher#awaitTermination() will throw InterruptedException.
consumerThread.interrupt();
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
index c08b7af..6ae4391 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.kinesis.testutils;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -32,6 +33,8 @@ import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -39,6 +42,10 @@ import java.util.concurrent.atomic.AtomicReference;
* {@link #awaitTermination()}.
*/
public class TestableKinesisDataFetcherForShardConsumerException<T> extends TestableKinesisDataFetcher<T> {
+ public volatile boolean wasInterrupted = false;
+
+ private OneShotLatch awaitTerminationWaiter = new OneShotLatch();
+
public TestableKinesisDataFetcherForShardConsumerException(final List<String> fakeStreams,
final SourceFunction.SourceContext<T> sourceContext,
final Properties fakeConfiguration,
@@ -54,7 +61,12 @@ public class TestableKinesisDataFetcherForShardConsumerException<T> extends Test
subscribedStreamsToLastDiscoveredShardIdsStateUnderTest, fakeKinesis);
}
- public volatile boolean wasInterrupted = false;
+ /**
+ * Block until awaitTermination() has been called on this class.
+ */
+ public void waitUntilAwaitTermination(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
+ awaitTerminationWaiter.await(timeout, timeUnit);
+ }
@Override
protected ExecutorService createShardConsumersThreadPool(final String subtaskName) {
@@ -65,6 +77,7 @@ public class TestableKinesisDataFetcherForShardConsumerException<T> extends Test
@Override
public void awaitTermination() throws InterruptedException {
+ awaitTerminationWaiter.trigger();
try {
// Force this method to only exit by thread getting interrupted.
while (true) {