You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/05/21 08:51:53 UTC

[flink] branch release-1.13 updated: [FLINK-22708][config] Propagate savepoint settings from StreamExecutionEnvironment to StreamGraph

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 1fffd9d  [FLINK-22708][config] Propagate savepoint settings from StreamExecutionEnvironment to StreamGraph
1fffd9d is described below

commit 1fffd9d9d7435872f5b10975a9cbf390fd126536
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed May 19 09:21:11 2021 +0200

    [FLINK-22708][config] Propagate savepoint settings from StreamExecutionEnvironment to StreamGraph
---
 .../runtime/jobgraph/SavepointRestoreSettings.java |  7 ++--
 .../streaming/api/graph/StreamGraphGenerator.java  |  3 +-
 .../api/graph/StreamGraphGeneratorTest.java        | 43 ++++++++++++++++++++++
 3 files changed, 49 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
index 16a0460..5aa7036 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobgraph;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 
 import java.io.Serializable;
 
@@ -153,10 +154,10 @@ public class SavepointRestoreSettings implements Serializable {
         }
     }
 
-    public static SavepointRestoreSettings fromConfiguration(final Configuration configuration) {
-        final String savepointPath = configuration.getString(SavepointConfigOptions.SAVEPOINT_PATH);
+    public static SavepointRestoreSettings fromConfiguration(final ReadableConfig configuration) {
+        final String savepointPath = configuration.get(SavepointConfigOptions.SAVEPOINT_PATH);
         final boolean allowNonRestored =
-                configuration.getBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE);
+                configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE);
         return savepointPath == null
                 ? SavepointRestoreSettings.none()
                 : SavepointRestoreSettings.forPath(savepointPath, allowNonRestored);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index c8d0775..d5f2860 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -157,7 +157,7 @@ public class StreamGraphGenerator {
 
     private String jobName = DEFAULT_JOB_NAME;
 
-    private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
+    private SavepointRestoreSettings savepointRestoreSettings;
 
     private long defaultBufferTimeout = StreamingJobGraphGenerator.UNDEFINED_NETWORK_BUFFER_TIMEOUT;
 
@@ -228,6 +228,7 @@ public class StreamGraphGenerator {
         this.checkpointConfig = new CheckpointConfig(checkpointConfig);
         this.configuration = checkNotNull(configuration);
         this.checkpointStorage = this.checkpointConfig.getCheckpointStorage();
+        this.savepointRestoreSettings = SavepointRestoreSettings.fromConfiguration(configuration);
     }
 
     public StreamGraphGenerator setRuntimeExecutionMode(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index dc01b60..555f5e5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -26,8 +26,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.streaming.api.datastream.BroadcastStream;
 import org.apache.flink.streaming.api.datastream.ConnectedStreams;
@@ -655,6 +657,47 @@ public class StreamGraphGeneratorTest extends TestLogger {
                 equalTo(resourceProfile3));
     }
 
+    @Test
+    public void testSettingSavepointRestoreSettings() {
+        Configuration config = new Configuration();
+        config.set(SavepointConfigOptions.SAVEPOINT_PATH, "/tmp/savepoint");
+
+        final StreamGraph streamGraph =
+                new StreamGraphGenerator(
+                                Collections.emptyList(),
+                                new ExecutionConfig(),
+                                new CheckpointConfig(),
+                                config)
+                        .generate();
+
+        SavepointRestoreSettings savepointRestoreSettings =
+                streamGraph.getSavepointRestoreSettings();
+        assertThat(
+                savepointRestoreSettings,
+                equalTo(SavepointRestoreSettings.forPath("/tmp/savepoint")));
+    }
+
+    @Test
+    public void testSettingSavepointRestoreSettingsSetterOverrides() {
+        Configuration config = new Configuration();
+        config.set(SavepointConfigOptions.SAVEPOINT_PATH, "/tmp/savepoint");
+
+        StreamGraphGenerator generator =
+                new StreamGraphGenerator(
+                        Collections.emptyList(),
+                        new ExecutionConfig(),
+                        new CheckpointConfig(),
+                        config);
+        generator.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("/tmp/savepoint1"));
+        final StreamGraph streamGraph = generator.generate();
+
+        SavepointRestoreSettings savepointRestoreSettings =
+                streamGraph.getSavepointRestoreSettings();
+        assertThat(
+                savepointRestoreSettings,
+                equalTo(SavepointRestoreSettings.forPath("/tmp/savepoint1")));
+    }
+
     private static class OutputTypeConfigurableFunction<T>
             implements OutputTypeConfigurable<T>, Function {
         private TypeInformation<T> typeInformation;