You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/12/04 13:40:52 UTC

[flink] branch release-1.13 updated: [FLINK-24506][config] Adds checkpoint directory to CheckpointConfig

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 98c49d6  [FLINK-24506][config] Adds checkpoint directory to CheckpointConfig
98c49d6 is described below

commit 98c49d63efa7723ed5394e503f7f0c3f54061456
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Tue Nov 30 18:02:17 2021 +0100

    [FLINK-24506][config] Adds checkpoint directory to CheckpointConfig
    
    This enables the user to pass in the checkpoint directory through the Flink
    configuration again. This fixes an issue that was introduced by FLINK-19463.
---
 .../api/environment/CheckpointConfig.java          |   4 +
 .../CheckpointConfigFromConfigurationTest.java     | 102 +++++++++++++++++++--
 2 files changed, 99 insertions(+), 7 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 6b0275b..3cf2e86 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointStorage;
@@ -756,5 +757,8 @@ public class CheckpointConfig implements java.io.Serializable {
         configuration
                 .getOptional(ExecutionCheckpointingOptions.FORCE_UNALIGNED)
                 .ifPresent(this::setForceUnalignedCheckpoints);
+        configuration
+                .getOptional(CheckpointingOptions.CHECKPOINTS_DIRECTORY)
+                .ifPresent(this::setCheckpointStorage);
     }
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
index 0bbab99..09ae9af 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
@@ -18,10 +18,18 @@
 
 package org.apache.flink.streaming.api.environment;
 
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
 import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.util.Preconditions;
 
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -31,8 +39,7 @@ import java.util.Collection;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 /**
  * Tests for configuring {@link CheckpointConfig} via {@link
@@ -42,7 +49,7 @@ import static org.junit.Assert.assertThat;
 public class CheckpointConfigFromConfigurationTest {
 
     @Parameterized.Parameters(name = "{0}")
-    public static Collection<TestSpec> specs() {
+    public static Collection<TestSpec<?>> specs() {
         return Arrays.asList(
                 TestSpec.testValue(CheckpointingMode.AT_LEAST_ONCE)
                         .whenSetFromFile("execution.checkpointing.mode", "AT_LEAST_ONCE")
@@ -96,10 +103,82 @@ public class CheckpointConfigFromConfigurationTest {
                         .whenSetFromFile("execution.checkpointing.unaligned", "true")
                         .viaSetter(CheckpointConfig::enableUnalignedCheckpoints)
                         .getterVia(CheckpointConfig::isUnalignedCheckpointsEnabled)
-                        .nonDefaultValue(true));
+                        .nonDefaultValue(true),
+                TestSpec.testValue(
+                                (CheckpointStorage)
+                                        new FileSystemCheckpointStorage(
+                                                "file:///path/to/checkpoint/dir"))
+                        .whenSetFromFile(
+                                CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(),
+                                "file:///path/to/checkpoint/dir")
+                        .viaSetter(CheckpointConfig::setCheckpointStorage)
+                        .getterVia(CheckpointConfig::getCheckpointStorage)
+                        .nonDefaultValue(
+                                new FileSystemCheckpointStorage("file:///path/to/checkpoint/dir"))
+                        .customMatcher(FileSystemCheckpointStorageMatcher::new));
     }
 
-    @Parameterized.Parameter public TestSpec spec;
+    /**
+     * {@code FileSystemCheckpointStorageMatcher} verifies that the set {@link CheckpointStorage} is
+     * of type {@link FileSystemCheckpointStorage} pointing to the same filesystem path.
+     */
+    private static class FileSystemCheckpointStorageMatcher
+            extends TypeSafeMatcher<CheckpointStorage> {
+        private static final Class<FileSystemCheckpointStorage> EXPECTED_CHECKPOINT_STORAGE_CLASS =
+                FileSystemCheckpointStorage.class;
+        private final FileSystemCheckpointStorage fileSystemCheckpointStorageFromSetter;
+
+        public FileSystemCheckpointStorageMatcher(CheckpointStorage checkpointStorageFromSetter) {
+            Preconditions.checkArgument(
+                    checkpointStorageFromSetter
+                            .getClass()
+                            .equals(EXPECTED_CHECKPOINT_STORAGE_CLASS));
+            this.fileSystemCheckpointStorageFromSetter =
+                    (FileSystemCheckpointStorage) checkpointStorageFromSetter;
+        }
+
+        @Override
+        public void describeTo(Description description) {
+            describeObject(fileSystemCheckpointStorageFromSetter, description);
+        }
+
+        @Override
+        protected void describeMismatchSafely(
+                CheckpointStorage checkpointStorageFromFile, Description description) {
+            if (!checkpointStorageFromFile.getClass().equals(EXPECTED_CHECKPOINT_STORAGE_CLASS)) {
+                description.appendText(
+                        "Passed object is not of type "
+                                + EXPECTED_CHECKPOINT_STORAGE_CLASS.getCanonicalName());
+            } else {
+                describeObject(
+                        (FileSystemCheckpointStorage) checkpointStorageFromFile, description);
+            }
+            super.describeMismatchSafely(checkpointStorageFromFile, description);
+        }
+
+        private static void describeObject(
+                FileSystemCheckpointStorage fileSystemCheckpointStorage, Description description) {
+            description
+                    .appendText(EXPECTED_CHECKPOINT_STORAGE_CLASS.getCanonicalName())
+                    .appendText("(")
+                    .appendText(fileSystemCheckpointStorage.getCheckpointPath().toString())
+                    .appendText(")");
+        }
+
+        @Override
+        protected boolean matchesSafely(CheckpointStorage checkpointStorageFromFile) {
+            if (!checkpointStorageFromFile.getClass().equals(EXPECTED_CHECKPOINT_STORAGE_CLASS)) {
+                return false;
+            }
+            final FileSystemCheckpointStorage fileSystemCheckpointStorageFromFile =
+                    (FileSystemCheckpointStorage) checkpointStorageFromFile;
+            return fileSystemCheckpointStorageFromFile
+                    .getCheckpointPath()
+                    .equals(fileSystemCheckpointStorageFromSetter.getCheckpointPath());
+        }
+    }
+
+    @Parameterized.Parameter public TestSpec<?> spec;
 
     @Test
     public void testLoadingFromConfiguration() {
@@ -133,6 +212,8 @@ public class CheckpointConfigFromConfigurationTest {
         private BiConsumer<CheckpointConfig, T> setter;
         private Function<CheckpointConfig, T> getter;
 
+        private Function<T, Matcher<T>> createMatcher = CoreMatchers::equalTo;
+
         private TestSpec(T value) {
             this.objectValue = value;
         }
@@ -162,6 +243,11 @@ public class CheckpointConfigFromConfigurationTest {
             return this;
         }
 
+        public TestSpec<T> customMatcher(Function<T, Matcher<T>> customMatcher) {
+            this.createMatcher = customMatcher;
+            return this;
+        }
+
         public void setValue(CheckpointConfig config) {
             setter.accept(config, objectValue);
         }
@@ -172,11 +258,13 @@ public class CheckpointConfigFromConfigurationTest {
 
         public void assertEqual(
                 CheckpointConfig configFromFile, CheckpointConfig configFromSetters) {
-            assertThat(getter.apply(configFromFile), equalTo(getter.apply(configFromSetters)));
+            assertThat(
+                    getter.apply(configFromFile),
+                    createMatcher.apply(getter.apply(configFromSetters)));
         }
 
         public void assertEqualNonDefault(CheckpointConfig configFromFile) {
-            assertThat(getter.apply(configFromFile), equalTo(nonDefaultValue));
+            assertThat(getter.apply(configFromFile), createMatcher.apply(nonDefaultValue));
         }
 
         @Override