You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/06 04:23:53 UTC

[7/7] flink git commit: [FLINK-7041] Deserialize StateBackend from JobCheckpointingSettings with user classloader

[FLINK-7041] Deserialize StateBackend from JobCheckpointingSettings with user classloader

This closes #4232.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78303d4e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78303d4e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78303d4e

Branch: refs/heads/master
Commit: 78303d4e26d1fb3dffaa44b49ad5b22bc24db875
Parents: 5c6d797
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Jun 30 11:19:30 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Jul 6 12:11:57 2017 +0800

----------------------------------------------------------------------
 .../executiongraph/ExecutionGraphBuilder.java   | 16 +++---
 .../tasks/JobCheckpointingSettings.java         |  8 +--
 .../CheckpointSettingsSerializableTest.java     | 60 +++++++++++++++++++-
 .../tasks/JobCheckpointingSettingsTest.java     |  6 +-
 .../api/graph/StreamingJobGraphGenerator.java   | 18 +++++-
 5 files changed, 92 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/78303d4e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index d681640..c8ecd3c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -223,19 +223,21 @@ public class ExecutionGraphBuilder {
 			// if specified in the application, use from there, otherwise load from configuration
 			final StateBackend metadataBackend;
 
-			final StateBackend applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
+			final SerializedValue<StateBackend> applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
 			if (applicationConfiguredBackend != null) {
-				metadataBackend = applicationConfiguredBackend;
+				try {
+					metadataBackend = applicationConfiguredBackend.deserializeValue(classLoader);
+				} catch (IOException | ClassNotFoundException e) {
+					throw new JobExecutionException(jobId, "Could not instantiate configured state backend.", e);
+				}
 
 				log.info("Using application-defined state backend for checkpoint/savepoint metadata: {}.",
-						applicationConfiguredBackend);
-			}
-			else {
+					metadataBackend);
+			} else {
 				try {
 					metadataBackend = AbstractStateBackend
 							.loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log);
-				}
-				catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
+				} catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
 					throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/78303d4e/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
index a30a2ba..cc97e1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
@@ -56,7 +56,7 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 
 	/** The default state backend, if configured by the user in the job */
 	@Nullable
-	private final StateBackend defaultStateBackend;
+	private final SerializedValue<StateBackend> defaultStateBackend;
 
 	/** (Factories for) hooks that are executed on the checkpoint coordinator */
 	@Nullable
@@ -80,7 +80,7 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 			long minPauseBetweenCheckpoints,
 			int maxConcurrentCheckpoints,
 			ExternalizedCheckpointSettings externalizedCheckpointSettings,
-			@Nullable StateBackend defaultStateBackend,
+			@Nullable SerializedValue<StateBackend> defaultStateBackend,
 			boolean isExactlyOnce) {
 
 		this(verticesToTrigger, verticesToAcknowledge, verticesToConfirm,
@@ -97,7 +97,7 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 			long minPauseBetweenCheckpoints,
 			int maxConcurrentCheckpoints,
 			ExternalizedCheckpointSettings externalizedCheckpointSettings,
-			@Nullable StateBackend defaultStateBackend,
+			@Nullable SerializedValue<StateBackend> defaultStateBackend,
 			@Nullable SerializedValue<MasterTriggerRestoreHook.Factory[]> masterHooks,
 			boolean isExactlyOnce) {
 
@@ -155,7 +155,7 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 	}
 
 	@Nullable
-	public StateBackend getDefaultStateBackend() {
+	public SerializedValue<StateBackend> getDefaultStateBackend() {
 		return defaultStateBackend;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78303d4e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index 0246180..f597757 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -18,11 +18,15 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import java.io.IOException;
+import javax.annotation.Nullable;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -31,6 +35,12 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -43,6 +53,7 @@ import java.net.URLClassLoader;
 import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -53,7 +64,7 @@ import static org.mockito.Mockito.when;
 public class CheckpointSettingsSerializableTest extends TestLogger {
 
 	@Test
-	public void testClassLoaderForCheckpointHooks() throws Exception {
+	public void testDeserializationOfUserCodeWithUserClassLoader() throws Exception {
 		final ClassLoader classLoader = new URLClassLoader(new URL[0], getClass().getClassLoader());
 		final Serializable outOfClassPath = CommonTestUtils.createObjectForClassNotInClassPath(classLoader);
 
@@ -70,7 +81,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
 				0L,
 				1,
 				ExternalizedCheckpointSettings.none(),
-				null,
+				new SerializedValue<StateBackend>(new CustomStateBackend(outOfClassPath)),
 				serHooks,
 				true);
 
@@ -97,6 +108,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
 				log);
 
 		assertEquals(1, eg.getCheckpointCoordinator().getNumberOfRegisteredMasterHooks());
+		assertTrue(jobGraph.getCheckpointingSettings().getDefaultStateBackend().deserializeValue(classLoader) instanceof CustomStateBackend);
 	}
 
 	// ------------------------------------------------------------------------
@@ -119,4 +131,48 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
 			return hook;
 		}
 	}
+
+	private static final class CustomStateBackend implements StateBackend {
+
+		/**
+		 * Simulate a custom option that is not in the normal classpath.
+		 */
+		private Serializable customOption;
+
+		public CustomStateBackend(Serializable customOption) {
+			this.customOption = customOption;
+		}
+
+		@Override
+		public CheckpointStreamFactory createStreamFactory(
+			JobID jobId, String operatorIdentifier) throws IOException {
+			return null;
+		}
+
+		@Override
+		public CheckpointStreamFactory createSavepointStreamFactory(
+			JobID jobId,
+			String operatorIdentifier,
+			@Nullable String targetLocation) throws IOException {
+			return null;
+		}
+
+		@Override
+		public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+			Environment env,
+			JobID jobID,
+			String operatorIdentifier,
+			TypeSerializer<K> keySerializer,
+			int numberOfKeyGroups,
+			KeyGroupRange keyGroupRange,
+			TaskKvStateRegistry kvStateRegistry) throws Exception {
+			return null;
+		}
+
+		@Override
+		public OperatorStateBackend createOperatorStateBackend(
+			Environment env, String operatorIdentifier) throws Exception {
+			return null;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78303d4e/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
index c3524fa..097c296 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -45,7 +47,7 @@ public class JobCheckpointingSettingsTest {
 			112,
 			12,
 			ExternalizedCheckpointSettings.externalizeCheckpoints(true),
-			new MemoryStateBackend(),
+			new SerializedValue<StateBackend>(new MemoryStateBackend()),
 			false);
 
 		JobCheckpointingSettings copy = CommonTestUtils.createCopySerializable(settings);
@@ -60,6 +62,6 @@ public class JobCheckpointingSettingsTest {
 		assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(), copy.getExternalizedCheckpointSettings().deleteOnCancellation());
 		assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce());
 		assertNotNull(copy.getDefaultStateBackend());
-		assertTrue(copy.getDefaultStateBackend().getClass() == MemoryStateBackend.class);
+		assertTrue(copy.getDefaultStateBackend().deserializeValue(this.getClass().getClassLoader()).getClass() == MemoryStateBackend.class);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78303d4e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 3008a43..e70962b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
@@ -659,6 +660,21 @@ public class StreamingJobGraphGenerator {
 			}
 		}
 
+		// because the state backend can have user-defined code, it needs to be stored as
+		// eagerly serialized value
+		final SerializedValue<StateBackend> serializedStateBackend;
+		if (streamGraph.getStateBackend() == null) {
+			serializedStateBackend = null;
+		} else {
+			try {
+				serializedStateBackend =
+					new SerializedValue<StateBackend>(streamGraph.getStateBackend());
+			}
+			catch (IOException e) {
+				throw new FlinkRuntimeException("State backend is not serializable", e);
+			}
+		}
+
 		//  --- done, put it all together ---
 
 		JobCheckpointingSettings settings = new JobCheckpointingSettings(
@@ -666,7 +682,7 @@ public class StreamingJobGraphGenerator {
 				cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
 				cfg.getMaxConcurrentCheckpoints(),
 				externalizedCheckpointSettings,
-				streamGraph.getStateBackend(),
+				serializedStateBackend,
 				serializedHooks,
 				isExactlyOnce);