You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/05/21 09:23:29 UTC

[flink] branch release-1.12 updated: [FLINK-22708][config] Propagate savepoint settings from StreamExecutionEnvironment to StreamGraph

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new c35df7c  [FLINK-22708][config] Propagate savepoint settings from StreamExecutionEnvironment to StreamGraph
c35df7c is described below

commit c35df7c2462536cf704e86cabfdffdb6863e8f55
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed May 19 09:21:11 2021 +0200

    [FLINK-22708][config] Propagate savepoint settings from StreamExecutionEnvironment to StreamGraph
---
 .../runtime/jobgraph/SavepointRestoreSettings.java |  7 ++--
 .../streaming/api/graph/StreamGraphGenerator.java  |  3 +-
 .../api/graph/StreamGraphGeneratorTest.java        | 44 ++++++++++++++++++++++
 3 files changed, 50 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
index 16a0460..5aa7036 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobgraph;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 
 import java.io.Serializable;
 
@@ -153,10 +154,10 @@ public class SavepointRestoreSettings implements Serializable {
         }
     }
 
-    public static SavepointRestoreSettings fromConfiguration(final Configuration configuration) {
-        final String savepointPath = configuration.getString(SavepointConfigOptions.SAVEPOINT_PATH);
+    public static SavepointRestoreSettings fromConfiguration(final ReadableConfig configuration) {
+        final String savepointPath = configuration.get(SavepointConfigOptions.SAVEPOINT_PATH);
         final boolean allowNonRestored =
-                configuration.getBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE);
+                configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE);
         return savepointPath == null
                 ? SavepointRestoreSettings.none()
                 : SavepointRestoreSettings.forPath(savepointPath, allowNonRestored);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 0092517..a9917d8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -145,7 +145,7 @@ public class StreamGraphGenerator {
 
     private String jobName = DEFAULT_JOB_NAME;
 
-    private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
+    private SavepointRestoreSettings savepointRestoreSettings;
 
     private long defaultBufferTimeout = StreamingJobGraphGenerator.UNDEFINED_NETWORK_BUFFER_TIMEOUT;
 
@@ -215,6 +215,7 @@ public class StreamGraphGenerator {
         this.executionConfig = checkNotNull(executionConfig);
         this.checkpointConfig = new CheckpointConfig(checkpointConfig);
         this.configuration = checkNotNull(configuration);
+        this.savepointRestoreSettings = SavepointRestoreSettings.fromConfiguration(configuration);
     }
 
     public StreamGraphGenerator setRuntimeExecutionMode(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 14b6e0b..9e8fb13 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -24,7 +24,9 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.streaming.api.datastream.ConnectedStreams;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -68,6 +70,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -525,6 +528,47 @@ public class StreamGraphGeneratorTest extends TestLogger {
         }
     }
 
+    @Test
+    public void testSettingSavepointRestoreSettings() {
+        Configuration config = new Configuration();
+        config.set(SavepointConfigOptions.SAVEPOINT_PATH, "/tmp/savepoint");
+
+        final StreamGraph streamGraph =
+                new StreamGraphGenerator(
+                                Collections.emptyList(),
+                                new ExecutionConfig(),
+                                new CheckpointConfig(),
+                                config)
+                        .generate();
+
+        SavepointRestoreSettings savepointRestoreSettings =
+                streamGraph.getSavepointRestoreSettings();
+        assertThat(
+                savepointRestoreSettings,
+                equalTo(SavepointRestoreSettings.forPath("/tmp/savepoint")));
+    }
+
+    @Test
+    public void testSettingSavepointRestoreSettingsSetterOverrides() {
+        Configuration config = new Configuration();
+        config.set(SavepointConfigOptions.SAVEPOINT_PATH, "/tmp/savepoint");
+
+        StreamGraphGenerator generator =
+                new StreamGraphGenerator(
+                        Collections.emptyList(),
+                        new ExecutionConfig(),
+                        new CheckpointConfig(),
+                        config);
+        generator.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("/tmp/savepoint1"));
+        final StreamGraph streamGraph = generator.generate();
+
+        SavepointRestoreSettings savepointRestoreSettings =
+                streamGraph.getSavepointRestoreSettings();
+        assertThat(
+                savepointRestoreSettings,
+                equalTo(SavepointRestoreSettings.forPath("/tmp/savepoint1")));
+    }
+
     static class OutputTypeConfigurableOperationWithTwoInputs
             extends AbstractStreamOperator<Integer>
             implements TwoInputStreamOperator<Integer, Integer, Integer>,