You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/06 04:23:53 UTC
[7/7] flink git commit: [FLINK-7041] Deserialize StateBackend from
JobCheckpointingSettings with user classloader
[FLINK-7041] Deserialize StateBackend from JobCheckpointingSettings with user classloader
This closes #4232.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78303d4e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78303d4e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78303d4e
Branch: refs/heads/master
Commit: 78303d4e26d1fb3dffaa44b49ad5b22bc24db875
Parents: 5c6d797
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Jun 30 11:19:30 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Jul 6 12:11:57 2017 +0800
----------------------------------------------------------------------
.../executiongraph/ExecutionGraphBuilder.java | 16 +++---
.../tasks/JobCheckpointingSettings.java | 8 +--
.../CheckpointSettingsSerializableTest.java | 60 +++++++++++++++++++-
.../tasks/JobCheckpointingSettingsTest.java | 6 +-
.../api/graph/StreamingJobGraphGenerator.java | 18 +++++-
5 files changed, 92 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/78303d4e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index d681640..c8ecd3c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -223,19 +223,21 @@ public class ExecutionGraphBuilder {
// if specified in the application, use from there, otherwise load from configuration
final StateBackend metadataBackend;
- final StateBackend applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
+ final SerializedValue<StateBackend> applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
if (applicationConfiguredBackend != null) {
- metadataBackend = applicationConfiguredBackend;
+ try {
+ metadataBackend = applicationConfiguredBackend.deserializeValue(classLoader);
+ } catch (IOException | ClassNotFoundException e) {
+ throw new JobExecutionException(jobId, "Could not instantiate configured state backend.", e);
+ }
log.info("Using application-defined state backend for checkpoint/savepoint metadata: {}.",
- applicationConfiguredBackend);
- }
- else {
+ metadataBackend);
+ } else {
try {
metadataBackend = AbstractStateBackend
.loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log);
- }
- catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
+ } catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78303d4e/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
index a30a2ba..cc97e1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
@@ -56,7 +56,7 @@ public class JobCheckpointingSettings implements java.io.Serializable {
/** The default state backend, if configured by the user in the job */
@Nullable
- private final StateBackend defaultStateBackend;
+ private final SerializedValue<StateBackend> defaultStateBackend;
/** (Factories for) hooks that are executed on the checkpoint coordinator */
@Nullable
@@ -80,7 +80,7 @@ public class JobCheckpointingSettings implements java.io.Serializable {
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpoints,
ExternalizedCheckpointSettings externalizedCheckpointSettings,
- @Nullable StateBackend defaultStateBackend,
+ @Nullable SerializedValue<StateBackend> defaultStateBackend,
boolean isExactlyOnce) {
this(verticesToTrigger, verticesToAcknowledge, verticesToConfirm,
@@ -97,7 +97,7 @@ public class JobCheckpointingSettings implements java.io.Serializable {
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpoints,
ExternalizedCheckpointSettings externalizedCheckpointSettings,
- @Nullable StateBackend defaultStateBackend,
+ @Nullable SerializedValue<StateBackend> defaultStateBackend,
@Nullable SerializedValue<MasterTriggerRestoreHook.Factory[]> masterHooks,
boolean isExactlyOnce) {
@@ -155,7 +155,7 @@ public class JobCheckpointingSettings implements java.io.Serializable {
}
@Nullable
- public StateBackend getDefaultStateBackend() {
+ public SerializedValue<StateBackend> getDefaultStateBackend() {
return defaultStateBackend;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78303d4e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index 0246180..f597757 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -18,11 +18,15 @@
package org.apache.flink.runtime.checkpoint;
+import java.io.IOException;
+import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -31,6 +35,12 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
@@ -43,6 +53,7 @@ import java.net.URLClassLoader;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -53,7 +64,7 @@ import static org.mockito.Mockito.when;
public class CheckpointSettingsSerializableTest extends TestLogger {
@Test
- public void testClassLoaderForCheckpointHooks() throws Exception {
+ public void testDeserializationOfUserCodeWithUserClassLoader() throws Exception {
final ClassLoader classLoader = new URLClassLoader(new URL[0], getClass().getClassLoader());
final Serializable outOfClassPath = CommonTestUtils.createObjectForClassNotInClassPath(classLoader);
@@ -70,7 +81,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
0L,
1,
ExternalizedCheckpointSettings.none(),
- null,
+ new SerializedValue<StateBackend>(new CustomStateBackend(outOfClassPath)),
serHooks,
true);
@@ -97,6 +108,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
log);
assertEquals(1, eg.getCheckpointCoordinator().getNumberOfRegisteredMasterHooks());
+ assertTrue(jobGraph.getCheckpointingSettings().getDefaultStateBackend().deserializeValue(classLoader) instanceof CustomStateBackend);
}
// ------------------------------------------------------------------------
@@ -119,4 +131,48 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
return hook;
}
}
+
+ private static final class CustomStateBackend implements StateBackend {
+
+ /**
+ * Simulate a custom option that is not in the normal classpath.
+ */
+ private Serializable customOption;
+
+ public CustomStateBackend(Serializable customOption) {
+ this.customOption = customOption;
+ }
+
+ @Override
+ public CheckpointStreamFactory createStreamFactory(
+ JobID jobId, String operatorIdentifier) throws IOException {
+ return null;
+ }
+
+ @Override
+ public CheckpointStreamFactory createSavepointStreamFactory(
+ JobID jobId,
+ String operatorIdentifier,
+ @Nullable String targetLocation) throws IOException {
+ return null;
+ }
+
+ @Override
+ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+ Environment env,
+ JobID jobID,
+ String operatorIdentifier,
+ TypeSerializer<K> keySerializer,
+ int numberOfKeyGroups,
+ KeyGroupRange keyGroupRange,
+ TaskKvStateRegistry kvStateRegistry) throws Exception {
+ return null;
+ }
+
+ @Override
+ public OperatorStateBackend createOperatorStateBackend(
+ Environment env, String operatorIdentifier) throws Exception {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78303d4e/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
index c3524fa..097c296 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.jobgraph.tasks;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.SerializedValue;
import org.junit.Test;
import java.util.Arrays;
@@ -45,7 +47,7 @@ public class JobCheckpointingSettingsTest {
112,
12,
ExternalizedCheckpointSettings.externalizeCheckpoints(true),
- new MemoryStateBackend(),
+ new SerializedValue<StateBackend>(new MemoryStateBackend()),
false);
JobCheckpointingSettings copy = CommonTestUtils.createCopySerializable(settings);
@@ -60,6 +62,6 @@ public class JobCheckpointingSettingsTest {
assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(), copy.getExternalizedCheckpointSettings().deleteOnCancellation());
assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce());
assertNotNull(copy.getDefaultStateBackend());
- assertTrue(copy.getDefaultStateBackend().getClass() == MemoryStateBackend.class);
+ assertTrue(copy.getDefaultStateBackend().deserializeValue(this.getClass().getClassLoader()).getClass() == MemoryStateBackend.class);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78303d4e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 3008a43..e70962b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
@@ -659,6 +660,21 @@ public class StreamingJobGraphGenerator {
}
}
+ // because the state backend can have user-defined code, it needs to be stored as
+ // eagerly serialized value
+ final SerializedValue<StateBackend> serializedStateBackend;
+ if (streamGraph.getStateBackend() == null) {
+ serializedStateBackend = null;
+ } else {
+ try {
+ serializedStateBackend =
+ new SerializedValue<StateBackend>(streamGraph.getStateBackend());
+ }
+ catch (IOException e) {
+ throw new FlinkRuntimeException("State backend is not serializable", e);
+ }
+ }
+
// --- done, put it all together ---
JobCheckpointingSettings settings = new JobCheckpointingSettings(
@@ -666,7 +682,7 @@ public class StreamingJobGraphGenerator {
cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
cfg.getMaxConcurrentCheckpoints(),
externalizedCheckpointSettings,
- streamGraph.getStateBackend(),
+ serializedStateBackend,
serializedHooks,
isExactlyOnce);