You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/12/04 13:40:52 UTC
[flink] branch release-1.13 updated: [FLINK-24506][config] Adds checkpoint directory to CheckpointConfig
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 98c49d6 [FLINK-24506][config] Adds checkpoint directory to CheckpointConfig
98c49d6 is described below
commit 98c49d63efa7723ed5394e503f7f0c3f54061456
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Tue Nov 30 18:02:17 2021 +0100
[FLINK-24506][config] Adds checkpoint directory to CheckpointConfig
This enables the user to pass in the checkpoint directory through the Flink
configuration again. This fixes an issue that was introduced by FLINK-19463.
---
.../api/environment/CheckpointConfig.java | 4 +
.../CheckpointConfigFromConfigurationTest.java | 102 +++++++++++++++++++--
2 files changed, 99 insertions(+), 7 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 6b0275b..3cf2e86 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStorage;
@@ -756,5 +757,8 @@ public class CheckpointConfig implements java.io.Serializable {
configuration
.getOptional(ExecutionCheckpointingOptions.FORCE_UNALIGNED)
.ifPresent(this::setForceUnalignedCheckpoints);
+ configuration
+ .getOptional(CheckpointingOptions.CHECKPOINTS_DIRECTORY)
+ .ifPresent(this::setCheckpointStorage);
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
index 0bbab99..09ae9af 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
@@ -18,10 +18,18 @@
package org.apache.flink.streaming.api.environment;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.util.Preconditions;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -31,8 +39,7 @@ import java.util.Collection;
import java.util.function.BiConsumer;
import java.util.function.Function;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
/**
* Tests for configuring {@link CheckpointConfig} via {@link
@@ -42,7 +49,7 @@ import static org.junit.Assert.assertThat;
public class CheckpointConfigFromConfigurationTest {
@Parameterized.Parameters(name = "{0}")
- public static Collection<TestSpec> specs() {
+ public static Collection<TestSpec<?>> specs() {
return Arrays.asList(
TestSpec.testValue(CheckpointingMode.AT_LEAST_ONCE)
.whenSetFromFile("execution.checkpointing.mode", "AT_LEAST_ONCE")
@@ -96,10 +103,82 @@ public class CheckpointConfigFromConfigurationTest {
.whenSetFromFile("execution.checkpointing.unaligned", "true")
.viaSetter(CheckpointConfig::enableUnalignedCheckpoints)
.getterVia(CheckpointConfig::isUnalignedCheckpointsEnabled)
- .nonDefaultValue(true));
+ .nonDefaultValue(true),
+ TestSpec.testValue(
+ (CheckpointStorage)
+ new FileSystemCheckpointStorage(
+ "file:///path/to/checkpoint/dir"))
+ .whenSetFromFile(
+ CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(),
+ "file:///path/to/checkpoint/dir")
+ .viaSetter(CheckpointConfig::setCheckpointStorage)
+ .getterVia(CheckpointConfig::getCheckpointStorage)
+ .nonDefaultValue(
+ new FileSystemCheckpointStorage("file:///path/to/checkpoint/dir"))
+ .customMatcher(FileSystemCheckpointStorageMatcher::new));
}
- @Parameterized.Parameter public TestSpec spec;
+ /**
+ * {@code FileSystemCheckpointStorageMatcher} verifies that the set {@link CheckpointStorage} is
+ * of type {@link FileSystemCheckpointStorage} pointing to the same filesystem path.
+ */
+ private static class FileSystemCheckpointStorageMatcher
+ extends TypeSafeMatcher<CheckpointStorage> {
+ private static final Class<FileSystemCheckpointStorage> EXPECTED_CHECKPOINT_STORAGE_CLASS =
+ FileSystemCheckpointStorage.class;
+ private final FileSystemCheckpointStorage fileSystemCheckpointStorageFromSetter;
+
+ public FileSystemCheckpointStorageMatcher(CheckpointStorage checkpointStorageFromSetter) {
+ Preconditions.checkArgument(
+ checkpointStorageFromSetter
+ .getClass()
+ .equals(EXPECTED_CHECKPOINT_STORAGE_CLASS));
+ this.fileSystemCheckpointStorageFromSetter =
+ (FileSystemCheckpointStorage) checkpointStorageFromSetter;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ describeObject(fileSystemCheckpointStorageFromSetter, description);
+ }
+
+ @Override
+ protected void describeMismatchSafely(
+ CheckpointStorage checkpointStorageFromFile, Description description) {
+ if (!checkpointStorageFromFile.getClass().equals(EXPECTED_CHECKPOINT_STORAGE_CLASS)) {
+ description.appendText(
+ "Passed object is not of type "
+ + EXPECTED_CHECKPOINT_STORAGE_CLASS.getCanonicalName());
+ } else {
+ describeObject(
+ (FileSystemCheckpointStorage) checkpointStorageFromFile, description);
+ }
+ super.describeMismatchSafely(checkpointStorageFromFile, description);
+ }
+
+ private static void describeObject(
+ FileSystemCheckpointStorage fileSystemCheckpointStorage, Description description) {
+ description
+ .appendText(EXPECTED_CHECKPOINT_STORAGE_CLASS.getCanonicalName())
+ .appendText("(")
+ .appendText(fileSystemCheckpointStorage.getCheckpointPath().toString())
+ .appendText(")");
+ }
+
+ @Override
+ protected boolean matchesSafely(CheckpointStorage checkpointStorageFromFile) {
+ if (!checkpointStorageFromFile.getClass().equals(EXPECTED_CHECKPOINT_STORAGE_CLASS)) {
+ return false;
+ }
+ final FileSystemCheckpointStorage fileSystemCheckpointStorageFromFile =
+ (FileSystemCheckpointStorage) checkpointStorageFromFile;
+ return fileSystemCheckpointStorageFromFile
+ .getCheckpointPath()
+ .equals(fileSystemCheckpointStorageFromSetter.getCheckpointPath());
+ }
+ }
+
+ @Parameterized.Parameter public TestSpec<?> spec;
@Test
public void testLoadingFromConfiguration() {
@@ -133,6 +212,8 @@ public class CheckpointConfigFromConfigurationTest {
private BiConsumer<CheckpointConfig, T> setter;
private Function<CheckpointConfig, T> getter;
+ private Function<T, Matcher<T>> createMatcher = CoreMatchers::equalTo;
+
private TestSpec(T value) {
this.objectValue = value;
}
@@ -162,6 +243,11 @@ public class CheckpointConfigFromConfigurationTest {
return this;
}
+ public TestSpec<T> customMatcher(Function<T, Matcher<T>> customMatcher) {
+ this.createMatcher = customMatcher;
+ return this;
+ }
+
public void setValue(CheckpointConfig config) {
setter.accept(config, objectValue);
}
@@ -172,11 +258,13 @@ public class CheckpointConfigFromConfigurationTest {
public void assertEqual(
CheckpointConfig configFromFile, CheckpointConfig configFromSetters) {
- assertThat(getter.apply(configFromFile), equalTo(getter.apply(configFromSetters)));
+ assertThat(
+ getter.apply(configFromFile),
+ createMatcher.apply(getter.apply(configFromSetters)));
}
public void assertEqualNonDefault(CheckpointConfig configFromFile) {
- assertThat(getter.apply(configFromFile), equalTo(nonDefaultValue));
+ assertThat(getter.apply(configFromFile), createMatcher.apply(nonDefaultValue));
}
@Override