You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/05/21 09:23:29 UTC
[flink] branch release-1.12 updated: [FLINK-22708][config]
Propagate savepoint settings from StreamExecutionEnvironment to StreamGraph
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new c35df7c [FLINK-22708][config] Propagate savepoint settings from StreamExecutionEnvironment to StreamGraph
c35df7c is described below
commit c35df7c2462536cf704e86cabfdffdb6863e8f55
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed May 19 09:21:11 2021 +0200
[FLINK-22708][config] Propagate savepoint settings from StreamExecutionEnvironment to StreamGraph
---
.../runtime/jobgraph/SavepointRestoreSettings.java | 7 ++--
.../streaming/api/graph/StreamGraphGenerator.java | 3 +-
.../api/graph/StreamGraphGeneratorTest.java | 44 ++++++++++++++++++++++
3 files changed, 50 insertions(+), 4 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
index 16a0460..5aa7036 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.jobgraph;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
import java.io.Serializable;
@@ -153,10 +154,10 @@ public class SavepointRestoreSettings implements Serializable {
}
}
- public static SavepointRestoreSettings fromConfiguration(final Configuration configuration) {
- final String savepointPath = configuration.getString(SavepointConfigOptions.SAVEPOINT_PATH);
+ public static SavepointRestoreSettings fromConfiguration(final ReadableConfig configuration) {
+ final String savepointPath = configuration.get(SavepointConfigOptions.SAVEPOINT_PATH);
final boolean allowNonRestored =
- configuration.getBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE);
+ configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE);
return savepointPath == null
? SavepointRestoreSettings.none()
: SavepointRestoreSettings.forPath(savepointPath, allowNonRestored);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 0092517..a9917d8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -145,7 +145,7 @@ public class StreamGraphGenerator {
private String jobName = DEFAULT_JOB_NAME;
- private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
+ private SavepointRestoreSettings savepointRestoreSettings;
private long defaultBufferTimeout = StreamingJobGraphGenerator.UNDEFINED_NETWORK_BUFFER_TIMEOUT;
@@ -215,6 +215,7 @@ public class StreamGraphGenerator {
this.executionConfig = checkNotNull(executionConfig);
this.checkpointConfig = new CheckpointConfig(checkpointConfig);
this.configuration = checkNotNull(configuration);
+ this.savepointRestoreSettings = SavepointRestoreSettings.fromConfiguration(configuration);
}
public StreamGraphGenerator setRuntimeExecutionMode(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 14b6e0b..9e8fb13 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -24,7 +24,9 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -68,6 +70,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -525,6 +528,47 @@ public class StreamGraphGeneratorTest extends TestLogger {
}
}
+ @Test
+ public void testSettingSavepointRestoreSettings() {
+ Configuration config = new Configuration();
+ config.set(SavepointConfigOptions.SAVEPOINT_PATH, "/tmp/savepoint");
+
+ final StreamGraph streamGraph =
+ new StreamGraphGenerator(
+ Collections.emptyList(),
+ new ExecutionConfig(),
+ new CheckpointConfig(),
+ config)
+ .generate();
+
+ SavepointRestoreSettings savepointRestoreSettings =
+ streamGraph.getSavepointRestoreSettings();
+ assertThat(
+ savepointRestoreSettings,
+ equalTo(SavepointRestoreSettings.forPath("/tmp/savepoint")));
+ }
+
+ @Test
+ public void testSettingSavepointRestoreSettingsSetterOverrides() {
+ Configuration config = new Configuration();
+ config.set(SavepointConfigOptions.SAVEPOINT_PATH, "/tmp/savepoint");
+
+ StreamGraphGenerator generator =
+ new StreamGraphGenerator(
+ Collections.emptyList(),
+ new ExecutionConfig(),
+ new CheckpointConfig(),
+ config);
+ generator.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("/tmp/savepoint1"));
+ final StreamGraph streamGraph = generator.generate();
+
+ SavepointRestoreSettings savepointRestoreSettings =
+ streamGraph.getSavepointRestoreSettings();
+ assertThat(
+ savepointRestoreSettings,
+ equalTo(SavepointRestoreSettings.forPath("/tmp/savepoint1")));
+ }
+
static class OutputTypeConfigurableOperationWithTwoInputs
extends AbstractStreamOperator<Integer>
implements TwoInputStreamOperator<Integer, Integer, Integer>,