You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/21 17:39:37 UTC

[flink] 02/04: [FLINK-17258][network][test] Run ClassLoaderITCase#testDisposeSavepointWithCustomKvState without unaligned checkpoints

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b3a78515870d3a9b461130b6cfc92e72412ed209
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu May 14 20:34:22 2020 +0200

    [FLINK-17258][network][test] Run ClassLoaderITCase#testDisposeSavepointWithCustomKvState without unaligned checkpoints
    
    This test needs many concurent checkpoint & savepoints and this is currently not supported with unaligned checkpoints.
---
 .../java/org/apache/flink/test/classloading/ClassLoaderITCase.java   | 4 +++-
 .../org/apache/flink/test/classloading/jar/CustomKvStateProgram.java | 5 ++++-
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 6ea054a..974512c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -313,7 +313,9 @@ public class ClassLoaderITCase extends TestLogger {
 				String.valueOf(parallelism),
 				checkpointDir.toURI().toString(),
 				"5000",
-				outputDir.toURI().toString()})
+				outputDir.toURI().toString(),
+				"false" // Disable unaligned checkpoints as this test is triggering concurrent savepoints/checkpoints
+			})
 			.build();
 
 		TestStreamEnvironment.setAsContext(
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
index 954b8df..d6f4aa1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.InfiniteIntegerSource;
 import org.apache.flink.util.Collector;
 
+import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
 
 /**
@@ -47,10 +48,12 @@ public class CustomKvStateProgram {
 		final String checkpointPath = args[1];
 		final int checkpointingInterval = Integer.parseInt(args[2]);
 		final String outputPath = args[3];
+		final Optional<Boolean> unalignedCheckpoints = args.length > 4 ? Optional.of(Boolean.parseBoolean(args[4])) : Optional.empty();
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(parallelism);
-				env.enableCheckpointing(checkpointingInterval);
+		env.enableCheckpointing(checkpointingInterval);
+		unalignedCheckpoints.ifPresent(value -> env.getCheckpointConfig().enableUnalignedCheckpoints(value));
 		env.setStateBackend(new FsStateBackend(checkpointPath));
 
 		DataStream<Integer> source = env.addSource(new InfiniteIntegerSource());