You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/07/25 08:58:04 UTC
[flink] 04/05: [FLINK-23528][connectors/kinesis] Reenable FlinkKinesisITCase and rewrite stopWithSavepoint.
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8fb2c3a4cf3a37b87403e462f675bb5d614a4b51
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Sep 7 22:23:15 2021 +0200
[FLINK-23528][connectors/kinesis] Reenable FlinkKinesisITCase and rewrite stopWithSavepoint.
---
.../connectors/kinesis/FlinkKinesisITCase.java | 141 +++++++++++++++------
.../kinesis/internals/KinesisDataFetcherTest.java | 11 +-
2 files changed, 105 insertions(+), 47 deletions(-)
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
index 6a394860450..705be463475 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
@@ -19,44 +19,53 @@ package org.apache.flink.streaming.connectors.kinesis;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.DockerImageVersions;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestNameProvider;
import org.junit.Before;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testcontainers.utility.DockerImageName;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.lessThan;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
/** IT cases for using Kinesis consumer/producer based on Kinesalite. */
-@Ignore("See FLINK-23528")
public class FlinkKinesisITCase extends TestLogger {
- public static final String TEST_STREAM = "test_stream";
+ private String stream;
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisITCase.class);
@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER =
@@ -69,20 +78,34 @@ public class FlinkKinesisITCase extends TestLogger {
@Rule public TemporaryFolder temp = new TemporaryFolder();
+ @Rule public SharedObjects sharedObjects = SharedObjects.create();
+
private static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema();
private KinesisPubsubClient client;
@Before
- public void setupClient() {
+ public void setupClient() throws Exception {
client = new KinesisPubsubClient(kinesalite.getContainerProperties());
+ stream = TestNameProvider.getCurrentTestName().replaceAll("\\W", "");
+ client.createTopic(stream, 1, new Properties());
+ }
+
+ @Test
+ public void testStopWithSavepoint() throws Exception {
+ testStopWithSavepoint(false);
+ }
+
+ @Test
+ public void testStopWithSavepointWithDrain() throws Exception {
+ testStopWithSavepoint(true);
}
/**
* Tests that pending elements do not cause a deadlock during stop with savepoint (FLINK-17170).
*
* <ol>
- * <li>The test setups up a stream with 100 records and creates a Flink job that reads them
+ * <li>The test setups up a stream with 1000 records and creates a Flink job that reads them
* with very slowly (using up a large chunk of time of the mailbox).
* <li>After ensuring that consumption has started, the job is stopped in a parallel thread.
* <li>Without the fix of FLINK-17170, the job now has a high chance to deadlock during
@@ -90,83 +113,117 @@ public class FlinkKinesisITCase extends TestLogger {
* <li>With the fix, the job proceeds and we can lift the backpressure.
* </ol>
*/
- @Test
- public void testStopWithSavepoint() throws Exception {
- client.createTopic(TEST_STREAM, 1, new Properties());
-
+ private void testStopWithSavepoint(boolean drain) throws Exception {
// add elements to the test stream
int numElements = 1000;
client.sendMessage(
- TEST_STREAM,
+ stream,
IntStream.range(0, numElements).mapToObj(String::valueOf).toArray(String[]::new));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
+ env.enableCheckpointing(100L);
Properties config = kinesalite.getContainerProperties();
config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
FlinkKinesisConsumer<String> consumer =
- new FlinkKinesisConsumer<>(TEST_STREAM, STRING_SCHEMA, config);
+ new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
- DataStream<String> stream = env.addSource(consumer).map(new WaitingMapper());
+ SharedReference<CountDownLatch> savepointTrigger = sharedObjects.add(new CountDownLatch(1));
+ DataStream<String> stream =
+ env.addSource(consumer).map(new WaitingMapper(savepointTrigger));
// call stop with savepoint in another thread
ForkJoinTask<Object> stopTask =
ForkJoinPool.commonPool()
.submit(
() -> {
- WaitingMapper.firstElement.await();
- stopWithSavepoint();
- WaitingMapper.stopped = true;
+ savepointTrigger.get().await();
+ stopWithSavepoint(drain);
return null;
});
try {
List<String> result = stream.executeAndCollect(10000);
// stop with savepoint will most likely only return a small subset of the elements
// validate that the prefix is as expected
- assertThat(result, hasSize(lessThan(numElements)));
- assertThat(
- result,
- equalTo(
- IntStream.range(0, numElements)
- .mapToObj(String::valueOf)
- .collect(Collectors.toList())
- .subList(0, result.size())));
+ if (drain) {
+ assertThat(
+ result,
+ contains(
+ IntStream.range(0, numElements)
+ .mapToObj(String::valueOf)
+ .toArray()));
+ } else {
+ // stop with savepoint will most likely only return a small subset of the elements
+ // validate that the prefix is as expected
+ assertThat(
+ result,
+ contains(
+ IntStream.range(0, result.size())
+ .mapToObj(String::valueOf)
+ .toArray()));
+ }
} finally {
- stopTask.cancel(true);
+ stopTask.get();
}
}
- private String stopWithSavepoint() throws Exception {
+ private String stopWithSavepoint(boolean drain) throws Exception {
JobStatusMessage job =
MINI_CLUSTER.getClusterClient().listJobs().get().stream().findFirst().get();
return MINI_CLUSTER
.getClusterClient()
.stopWithSavepoint(
job.getJobId(),
- true,
+ drain,
temp.getRoot().getAbsolutePath(),
SavepointFormatType.CANONICAL)
.get();
}
- private static class WaitingMapper implements MapFunction<String, String> {
- static CountDownLatch firstElement;
- static volatile boolean stopped;
+ private static class WaitingMapper
+ implements MapFunction<String, String>, CheckpointedFunction {
+ private final SharedReference<CountDownLatch> savepointTrigger;
+ private volatile boolean savepointTriggered;
+ // keep track on when the last checkpoint occurred
+ private transient Deadline checkpointDeadline;
+ private final AtomicInteger numElements = new AtomicInteger();
+
+ WaitingMapper(SharedReference<CountDownLatch> savepointTrigger) {
+ this.savepointTrigger = savepointTrigger;
+ checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
+ }
- WaitingMapper() {
- firstElement = new CountDownLatch(1);
- stopped = false;
+ private void readObject(ObjectInputStream stream)
+ throws ClassNotFoundException, IOException {
+ stream.defaultReadObject();
+ checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
}
@Override
public String map(String value) throws Exception {
- if (firstElement.getCount() > 0) {
- firstElement.countDown();
- }
- if (!stopped) {
- Thread.sleep(100);
+ numElements.incrementAndGet();
+ if (!savepointTriggered) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ savepointTriggered = checkpointDeadline.isOverdue();
}
return value;
}
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) {
+ // assume that after the first savepoint, this function will only see new checkpoint
+ // when the final savepoint is triggered
+ if (numElements.get() > 0) {
+ this.checkpointDeadline = Deadline.fromNow(Duration.ofSeconds(1));
+ savepointTrigger.get().countDown();
+ }
+ LOG.info("snapshotState {} {}", context.getCheckpointId(), numElements);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) {}
}
}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 8be2bb41097..9f6d8e3f27f 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -17,11 +17,6 @@
package org.apache.flink.streaming.connectors.kinesis.internals;
-import com.amazonaws.services.kinesis.model.HashKeyRange;
-import com.amazonaws.services.kinesis.model.SequenceNumberRange;
-import com.amazonaws.services.kinesis.model.Shard;
-import org.apache.commons.lang3.mutable.MutableBoolean;
-import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.testutils.CheckedThread;
@@ -49,6 +44,12 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDa
import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcherForShardConsumerException;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
+
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableLong;
import org.junit.Assert;
import org.junit.Test;
import org.powermock.reflect.Whitebox;