You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/07/25 08:58:05 UTC
[flink] 05/05: [FLINK-23528][connectors/kinesis]Graceful shutdown of Kinesis Consumer in EFO mode
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7cd5fe93035c5d875d10d9f3cb1d84d4ed045dc0
Author: Krzysztof Dziolak <dz...@amazon.com>
AuthorDate: Fri Jul 1 10:08:45 2022 +0100
[FLINK-23528][connectors/kinesis]Graceful shutdown of Kinesis Consumer in EFO mode
---
.../kinesis/internals/KinesisDataFetcher.java | 7 ++-
.../publisher/fanout/FanOutShardSubscriber.java | 3 +-
.../connectors/kinesis/FlinkKinesisITCase.java | 68 +++++++++++++++-------
.../fanout/FanOutShardSubscriberTest.java | 20 +++++++
4 files changed, 74 insertions(+), 24 deletions(-)
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index e12bdaa0b3a..4fcc80a250e 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -819,7 +819,7 @@ public class KinesisDataFetcher<T> {
LOG.warn("Encountered exception closing record publisher factory", e);
}
} finally {
- shardConsumersExecutor.shutdown();
+ gracefulShutdownShardConsumers();
cancelFuture.complete(null);
@@ -852,6 +852,11 @@ public class KinesisDataFetcher<T> {
StreamConsumerRegistrarUtil.deregisterStreamConsumers(configProps, streams);
}
+ /** Gracefully stops shardConsumersExecutor without interrupting running threads. */
+ private void gracefulShutdownShardConsumers() {
+ shardConsumersExecutor.shutdown();
+ }
+
/**
* Returns a flag indicating if this fetcher is running.
*
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
index afda248773c..a280a8f6537 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
@@ -361,8 +361,7 @@ public class FanOutShardSubscriber {
}
} else if (subscriptionEvent.isSubscriptionComplete()) {
// The subscription is complete, but the shard might not be, so we return incomplete
- result = false;
- break;
+ return false;
} else {
handleError(subscriptionEvent.getThrowable());
result = false;
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
index 705be463475..ee8066a61aa 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
@@ -39,7 +39,7 @@ import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TestNameProvider;
import org.junit.Before;
-import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -58,6 +58,8 @@ import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_CONSUMER_NAME;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
@@ -67,13 +69,13 @@ public class FlinkKinesisITCase extends TestLogger {
private String stream;
private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisITCase.class);
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER =
+ @Rule
+ public final MiniClusterWithClientResource miniCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder().build());
- @ClassRule
- public static KinesaliteContainer kinesalite =
+ @Rule
+ public KinesaliteContainer kinesalite =
new KinesaliteContainer(DockerImageName.parse(DockerImageVersions.KINESALITE));
@Rule public TemporaryFolder temp = new TemporaryFolder();
@@ -86,19 +88,31 @@ public class FlinkKinesisITCase extends TestLogger {
@Before
public void setupClient() throws Exception {
- client = new KinesisPubsubClient(kinesalite.getContainerProperties());
+ client = new KinesisPubsubClient(getContainerProperties());
stream = TestNameProvider.getCurrentTestName().replaceAll("\\W", "");
client.createTopic(stream, 1, new Properties());
}
@Test
public void testStopWithSavepoint() throws Exception {
- testStopWithSavepoint(false);
+ testStopWithSavepoint(false, false);
}
@Test
public void testStopWithSavepointWithDrain() throws Exception {
- testStopWithSavepoint(true);
+ testStopWithSavepoint(true, false);
+ }
+
+ @Test
+ @Ignore("Kinesalite does not support EFO")
+ public void testStopWithSavepointWithEfo() throws Exception {
+ testStopWithSavepoint(false, true);
+ }
+
+ @Test
+ @Ignore("Kinesalite does not support EFO")
+ public void testStopWithSavepointWithDrainAndEfo() throws Exception {
+ testStopWithSavepoint(true, true);
}
/**
@@ -113,7 +127,7 @@ public class FlinkKinesisITCase extends TestLogger {
* <li>With the fix, the job proceeds and we can lift the backpressure.
* </ol>
*/
- private void testStopWithSavepoint(boolean drain) throws Exception {
+ private void testStopWithSavepoint(boolean drain, boolean efo) throws Exception {
// add elements to the test stream
int numElements = 1000;
client.sendMessage(
@@ -124,14 +138,10 @@ public class FlinkKinesisITCase extends TestLogger {
env.setParallelism(1);
env.enableCheckpointing(100L);
- Properties config = kinesalite.getContainerProperties();
- config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
- FlinkKinesisConsumer<String> consumer =
- new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
-
SharedReference<CountDownLatch> savepointTrigger = sharedObjects.add(new CountDownLatch(1));
- DataStream<String> stream =
- env.addSource(consumer).map(new WaitingMapper(savepointTrigger));
+ DataStream<String> outputStream =
+ env.addSource(createKinesisConsumer(efo)).map(new WaitingMapper(savepointTrigger));
+
// call stop with savepoint in another thread
ForkJoinTask<Object> stopTask =
ForkJoinPool.commonPool()
@@ -142,7 +152,7 @@ public class FlinkKinesisITCase extends TestLogger {
return null;
});
try {
- List<String> result = stream.executeAndCollect(10000);
+ List<String> result = outputStream.executeAndCollect(10000);
// stop with savepoint will most likely only return a small subset of the elements
// validate that the prefix is as expected
if (drain) {
@@ -167,10 +177,24 @@ public class FlinkKinesisITCase extends TestLogger {
}
}
+ private FlinkKinesisConsumer<String> createKinesisConsumer(boolean efo) {
+ Properties config = getContainerProperties();
+ config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
+ if (efo) {
+ config.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO");
+ config.putIfAbsent(EFO_CONSUMER_NAME, "efo-flink-app");
+ }
+ return new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
+ }
+
+ private Properties getContainerProperties() {
+ return kinesalite.getContainerProperties();
+ }
+
private String stopWithSavepoint(boolean drain) throws Exception {
JobStatusMessage job =
- MINI_CLUSTER.getClusterClient().listJobs().get().stream().findFirst().get();
- return MINI_CLUSTER
+ miniCluster.getClusterClient().listJobs().get().stream().findFirst().get();
+ return miniCluster
.getClusterClient()
.stopWithSavepoint(
job.getJobId(),
@@ -190,13 +214,15 @@ public class FlinkKinesisITCase extends TestLogger {
WaitingMapper(SharedReference<CountDownLatch> savepointTrigger) {
this.savepointTrigger = savepointTrigger;
- checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
+ // effectively set 1 hour timeout on the wait
+ // this is reduced to 1 second once the data starts flowing
+ checkpointDeadline = Deadline.fromNow(Duration.ofMinutes(10));
}
private void readObject(ObjectInputStream stream)
throws ClassNotFoundException, IOException {
stream.defaultReadObject();
- checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
+ checkpointDeadline = Deadline.fromNow(Duration.ofMinutes(10));
}
@Override
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
index a41e7026984..9c711230096 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
@@ -31,6 +31,7 @@ import software.amazon.awssdk.services.kinesis.model.StartingPosition;
import java.time.Duration;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT;
+import static org.junit.Assert.assertFalse;
/** Tests for {@link FanOutShardSubscriber}. */
public class FanOutShardSubscriberTest {
@@ -120,6 +121,25 @@ public class FanOutShardSubscriberTest {
subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {});
}
+ @Test
+ public void testSubscriptionCompletion() throws Exception {
+ FakeKinesisFanOutBehavioursFactory.AbstractSingleShardFanOutKinesisV2 errorKinesisV2 =
+ FakeKinesisFanOutBehavioursFactory.emptyBatchFollowedBySingleRecord();
+
+ FanOutShardSubscriber subscriber =
+ new FanOutShardSubscriber(
+ "consumerArn",
+ "shardId",
+ errorKinesisV2,
+ DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
+
+ StartingPosition startingPosition = StartingPosition.builder().build();
+ boolean result =
+ subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {});
+
+ assertFalse(result);
+ }
+
@Test
public void testTimeoutSubscribingToShard() throws Exception {
thrown.expect(FanOutShardSubscriber.RecoverableFanOutSubscriberException.class);