You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/12/21 10:32:43 UTC
[flink-connector-aws] branch main updated: [FLINK-30224][Connectors/Kinesis] Added an IT test for slow FlinKinesisConsumer's run which caused NPE in close (#41)
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new 9ee0fe3 [FLINK-30224][Connectors/Kinesis] Added an IT test for slow FlinKinesisConsumer's run which caused NPE in close (#41)
9ee0fe3 is described below
commit 9ee0fe32d1a9e6d62e514824ab553956fe88ee9d
Author: Astamur Kirillin <as...@gmail.com>
AuthorDate: Wed Dec 21 10:32:38 2022 +0000
[FLINK-30224][Connectors/Kinesis] Added an IT test for slow FlinKinesisConsumer's run which caused NPE in close (#41)
Co-authored-by: Astamur Kirillin <as...@amazon.com>
---
.../connectors/kinesis/FlinkKinesisITCase.java | 99 +++++++++++++++++++++-
1 file changed, 97 insertions(+), 2 deletions(-)
diff --git a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
index 7f7676a..6066d82 100644
--- a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
+++ b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kinesis;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
@@ -29,7 +30,10 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
+import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
@@ -38,6 +42,7 @@ import org.apache.flink.util.DockerImageVersions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TestNameProvider;
+import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
@@ -52,9 +57,12 @@ import java.io.ObjectInputStream;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -63,6 +71,8 @@ import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfi
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertTrue;
/** IT cases for using Kinesis consumer/producer based on Kinesalite. */
public class FlinkKinesisITCase extends TestLogger {
@@ -115,6 +125,40 @@ public class FlinkKinesisITCase extends TestLogger {
testStopWithSavepoint(true, true);
}
+ /**
+ * Tests stop with savepoint while {@link
+ * FlinkKinesisConsumer#run(SourceFunction.SourceContext)}} with a slow run method is still
+ * hasn't finished run method and hasn't set a {@link KinesisDataFetcher} yet.
+ */
+ @Test
+ public void testStopWithSavepointWithSlowConsumer() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(100L);
+
+ SharedReference<CyclicBarrier> savepointTrigger = sharedObjects.add(new CyclicBarrier(2));
+ env.addSource(createSlowKinesisConsumer(savepointTrigger)).addSink(new DiscardingSink<>());
+
+ ForkJoinTask<String> stopTask =
+ ForkJoinPool.commonPool()
+ .submit(
+ () -> {
+ // Wait until run method is reached in the consumer
+ savepointTrigger.get().await();
+ String result = stopWithSavepoint(savepointTrigger);
+ log.info("StopWithSavepoint result: {}", result);
+ return result;
+ });
+
+ env.execute();
+ String result = stopTask.get(1, TimeUnit.MINUTES);
+
+ MatcherAssert.assertThat("Savepoint result shouldn't be null", result, notNullValue());
+ assertTrue(
+ "Unexpected savepoint file's name format",
+ result.matches("^file:.*/savepoint-.*$"));
+ }
+
/**
* Tests that pending elements do not cause a deadlock during stop with savepoint (FLINK-17170).
*
@@ -183,21 +227,42 @@ public class FlinkKinesisITCase extends TestLogger {
return new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
}
+ private FlinkKinesisConsumer<String> createSlowKinesisConsumer(
+ SharedReference<CyclicBarrier> savepointTrigger) {
+ Properties config = getContainerProperties();
+ config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
+ return new SlowFlinkKinesisConsumer<>(stream, STRING_SCHEMA, config, savepointTrigger);
+ }
+
private Properties getContainerProperties() {
return kinesalite.getContainerProperties();
}
private String stopWithSavepoint(boolean drain) throws Exception {
+ return callStopWithSavepoint(drain).get();
+ }
+
+ private String stopWithSavepoint(SharedReference<CyclicBarrier> savepointTrigger)
+ throws Exception {
+ CompletableFuture<String> resultFuture = callStopWithSavepoint(false);
+ // Release barrier in consumer after stop was called
+ savepointTrigger.get().await();
+
+ return resultFuture.get();
+ }
+
+ private CompletableFuture<String> callStopWithSavepoint(boolean drain) throws Exception {
JobStatusMessage job =
miniCluster.getClusterClient().listJobs().get().stream().findFirst().get();
+
+ log.info("Calling stopWithSavepoint: {}", job.getJobId());
return miniCluster
.getClusterClient()
.stopWithSavepoint(
job.getJobId(),
drain,
temp.getRoot().getAbsolutePath(),
- SavepointFormatType.CANONICAL)
- .get();
+ SavepointFormatType.CANONICAL);
}
private static class WaitingMapper
@@ -248,4 +313,34 @@ public class FlinkKinesisITCase extends TestLogger {
@Override
public void initializeState(FunctionInitializationContext context) {}
}
+
+ /**
+ * A simple implementation of {@link FlinkKinesisConsumer} with a slow run() method's execution
+ * which is controlled by incoming {@link CyclicBarrier} from the test.
+ */
+ private static class SlowFlinkKinesisConsumer<T> extends FlinkKinesisConsumer<T> {
+ private final SharedReference<CyclicBarrier> savepointTrigger;
+
+ public SlowFlinkKinesisConsumer(
+ String stream,
+ DeserializationSchema<T> deserializer,
+ Properties configProps,
+ SharedReference<CyclicBarrier> savepointTrigger) {
+ super(stream, deserializer, configProps);
+ this.savepointTrigger = savepointTrigger;
+ }
+
+ @Override
+ public void run(SourceContext<T> sourceContext) throws Exception {
+ // Wait until it's allowed to do a stop-with-savepoint operation
+ // (a job and a task are both in RUNNING state)
+ savepointTrigger.get().await();
+
+ // Start a new waiting cycle and imitate a slow run operation until stop is called
+ savepointTrigger.get().reset();
+ savepointTrigger.get().await();
+
+ super.run(sourceContext);
+ }
+ }
}