You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/05/21 08:51:53 UTC
[flink] branch release-1.13 updated: [FLINK-22708][config]
Propagate savepoint settings from StreamExecutionEnvironment to StreamGraph
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 1fffd9d [FLINK-22708][config] Propagate savepoint settings from StreamExecutionEnvironment to StreamGraph
1fffd9d is described below
commit 1fffd9d9d7435872f5b10975a9cbf390fd126536
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed May 19 09:21:11 2021 +0200
[FLINK-22708][config] Propagate savepoint settings from StreamExecutionEnvironment to StreamGraph
---
.../runtime/jobgraph/SavepointRestoreSettings.java | 7 ++--
.../streaming/api/graph/StreamGraphGenerator.java | 3 +-
.../api/graph/StreamGraphGeneratorTest.java | 43 ++++++++++++++++++++++
3 files changed, 49 insertions(+), 4 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
index 16a0460..5aa7036 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.jobgraph;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
import java.io.Serializable;
@@ -153,10 +154,10 @@ public class SavepointRestoreSettings implements Serializable {
}
}
- public static SavepointRestoreSettings fromConfiguration(final Configuration configuration) {
- final String savepointPath = configuration.getString(SavepointConfigOptions.SAVEPOINT_PATH);
+ public static SavepointRestoreSettings fromConfiguration(final ReadableConfig configuration) {
+ final String savepointPath = configuration.get(SavepointConfigOptions.SAVEPOINT_PATH);
final boolean allowNonRestored =
- configuration.getBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE);
+ configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE);
return savepointPath == null
? SavepointRestoreSettings.none()
: SavepointRestoreSettings.forPath(savepointPath, allowNonRestored);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index c8d0775..d5f2860 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -157,7 +157,7 @@ public class StreamGraphGenerator {
private String jobName = DEFAULT_JOB_NAME;
- private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
+ private SavepointRestoreSettings savepointRestoreSettings;
private long defaultBufferTimeout = StreamingJobGraphGenerator.UNDEFINED_NETWORK_BUFFER_TIMEOUT;
@@ -228,6 +228,7 @@ public class StreamGraphGenerator {
this.checkpointConfig = new CheckpointConfig(checkpointConfig);
this.configuration = checkNotNull(configuration);
this.checkpointStorage = this.checkpointConfig.getCheckpointStorage();
+ this.savepointRestoreSettings = SavepointRestoreSettings.fromConfiguration(configuration);
}
public StreamGraphGenerator setRuntimeExecutionMode(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index dc01b60..555f5e5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -26,8 +26,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
@@ -655,6 +657,47 @@ public class StreamGraphGeneratorTest extends TestLogger {
equalTo(resourceProfile3));
}
+ @Test
+ public void testSettingSavepointRestoreSettings() {
+ Configuration config = new Configuration();
+ config.set(SavepointConfigOptions.SAVEPOINT_PATH, "/tmp/savepoint");
+
+ final StreamGraph streamGraph =
+ new StreamGraphGenerator(
+ Collections.emptyList(),
+ new ExecutionConfig(),
+ new CheckpointConfig(),
+ config)
+ .generate();
+
+ SavepointRestoreSettings savepointRestoreSettings =
+ streamGraph.getSavepointRestoreSettings();
+ assertThat(
+ savepointRestoreSettings,
+ equalTo(SavepointRestoreSettings.forPath("/tmp/savepoint")));
+ }
+
+ @Test
+ public void testSettingSavepointRestoreSettingsSetterOverrides() {
+ Configuration config = new Configuration();
+ config.set(SavepointConfigOptions.SAVEPOINT_PATH, "/tmp/savepoint");
+
+ StreamGraphGenerator generator =
+ new StreamGraphGenerator(
+ Collections.emptyList(),
+ new ExecutionConfig(),
+ new CheckpointConfig(),
+ config);
+ generator.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("/tmp/savepoint1"));
+ final StreamGraph streamGraph = generator.generate();
+
+ SavepointRestoreSettings savepointRestoreSettings =
+ streamGraph.getSavepointRestoreSettings();
+ assertThat(
+ savepointRestoreSettings,
+ equalTo(SavepointRestoreSettings.forPath("/tmp/savepoint1")));
+ }
+
private static class OutputTypeConfigurableFunction<T>
implements OutputTypeConfigurable<T>, Function {
private TypeInformation<T> typeInformation;