You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/21 17:39:37 UTC
[flink] 02/04: [FLINK-17258][network][test] Run
ClassLoaderITCase#testDisposeSavepointWithCustomKvState without unaligned
checkpoints
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b3a78515870d3a9b461130b6cfc92e72412ed209
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu May 14 20:34:22 2020 +0200
[FLINK-17258][network][test] Run ClassLoaderITCase#testDisposeSavepointWithCustomKvState without unaligned checkpoints
This test needs many concurent checkpoint & savepoints and this is currently not supported with unaligned checkpoints.
---
.../java/org/apache/flink/test/classloading/ClassLoaderITCase.java | 4 +++-
.../org/apache/flink/test/classloading/jar/CustomKvStateProgram.java | 5 ++++-
2 files changed, 7 insertions(+), 2 deletions(-)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 6ea054a..974512c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -313,7 +313,9 @@ public class ClassLoaderITCase extends TestLogger {
String.valueOf(parallelism),
checkpointDir.toURI().toString(),
"5000",
- outputDir.toURI().toString()})
+ outputDir.toURI().toString(),
+ "false" // Disable unaligned checkpoints as this test is triggering concurrent savepoints/checkpoints
+ })
.build();
TestStreamEnvironment.setAsContext(
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
index 954b8df..d6f4aa1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.InfiniteIntegerSource;
import org.apache.flink.util.Collector;
+import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
/**
@@ -47,10 +48,12 @@ public class CustomKvStateProgram {
final String checkpointPath = args[1];
final int checkpointingInterval = Integer.parseInt(args[2]);
final String outputPath = args[3];
+ final Optional<Boolean> unalignedCheckpoints = args.length > 4 ? Optional.of(Boolean.parseBoolean(args[4])) : Optional.empty();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
- env.enableCheckpointing(checkpointingInterval);
+ env.enableCheckpointing(checkpointingInterval);
+ unalignedCheckpoints.ifPresent(value -> env.getCheckpointConfig().enableUnalignedCheckpoints(value));
env.setStateBackend(new FsStateBackend(checkpointPath));
DataStream<Integer> source = env.addSource(new InfiniteIntegerSource());