You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/13 15:40:15 UTC

[5/8] flink git commit: [FLINK-4735] [cluster management] Implements some job execution related RPC calls on the JobManager

[FLINK-4735] [cluster management] Implements some job execution related RPC calls on the JobManager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21b9f16b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21b9f16b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21b9f16b

Branch: refs/heads/flip-6
Commit: 21b9f16bb09785f72a7592925d3bb50160636797
Parents: 35a44da
Author: Kurt Young <yk...@gmail.com>
Authored: Tue Oct 4 23:00:22 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 13 16:25:49 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      | 246 +++++++++++++++++--
 .../runtime/jobmaster/JobMasterGateway.java     |  93 ++++++-
 .../jobmaster/message/ClassloadingProps.java    |  68 +++++
 .../message/DisposeSavepointResponse.java       |  49 ++++
 .../message/TriggerSavepointResponse.java       |  74 ++++++
 5 files changed, 507 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/21b9f16b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 8f3a342..3b8fc97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -39,8 +40,11 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -61,10 +65,20 @@ import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
+import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse;
+import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
+import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
@@ -72,7 +86,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.state.CheckpointStateHandles;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
@@ -520,22 +534,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			throw new ExecutionGraphException("The execution attempt " +
 				taskExecutionState.getID() + " was not found.");
 		}
-
-	}
-
-	//----------------------------------------------------------------------------------------------\u2028
-	// Internal methods\u2028
-	// ----------------------------------------------------------------------------------------------\u2028\u2028
-
-	private void handleFatalError(final Throwable cause) {
-		runAsync(new Runnable() {
-			@Override
-			public void run() {
-				log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
-				shutDown();
-				jobCompletionActions.onFatalError(cause);
-			}
-		});
 	}
 
 	@RpcMethod
@@ -631,10 +629,220 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		throw new UnsupportedOperationException();
 	}
 
+	@RpcMethod
+	public void resourceRemoved(final ResourceID resourceId, final String message) {
+		// TODO: remove resource from slot pool
+	}
+
+	@RpcMethod
+	public void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge) {
+		if (executionGraph != null) {
+			final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+			if (checkpointCoordinator != null) {
+				getRpcService().execute(new Runnable() {
+					@Override
+					public void run() {
+						try {
+							if (!checkpointCoordinator.receiveAcknowledgeMessage(acknowledge)) {
+								log.info("Received message for non-existing checkpoint {}.",
+									acknowledge.getCheckpointId());
+							}
+						} catch (Exception e) {
+							log.error("Error in CheckpointCoordinator while processing {}", acknowledge, e);
+						}
+					}
+				});
+			}
+			else {
+				log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator",
+					jobGraph.getJobID());
+			}
+		} else {
+			log.error("Received AcknowledgeCheckpoint for unavailable job {}", jobGraph.getJobID());
+		}
+	}
+
+	@RpcMethod
+	public void declineCheckpoint(final DeclineCheckpoint decline) {
+		if (executionGraph != null) {
+			final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+			if (checkpointCoordinator != null) {
+				getRpcService().execute(new Runnable() {
+					@Override
+					public void run() {
+						try {
+							if (!checkpointCoordinator.receiveDeclineMessage(decline)) {
+								log.info("Received message for non-existing checkpoint {}.", decline.getCheckpointId());
+							}
+						} catch (Exception e) {
+							log.error("Error in CheckpointCoordinator while processing {}", decline, e);
+						}
+					}
+				});
+			} else {
+				log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator",
+					jobGraph.getJobID());
+			}
+		} else {
+			log.error("Received AcknowledgeCheckpoint for unavailable job {}", jobGraph.getJobID());
+		}
+	}
+
+	@RpcMethod
+	public KvStateLocation lookupKvStateLocation(final String registrationName) throws Exception {
+		if (executionGraph != null) {
+			if (log.isDebugEnabled()) {
+				log.debug("Lookup key-value state for job {} with registration " +
+					"name {}.", jobGraph.getJobID(), registrationName);
+			}
+
+			final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry();
+			final KvStateLocation location = registry.getKvStateLocation(registrationName);
+			if (location != null) {
+				return location;
+			} else {
+				throw new UnknownKvStateLocation(registrationName);
+			}
+		} else {
+			throw new IllegalStateException("Received lookup KvState location request for unavailable job " +
+				jobGraph.getJobID());
+		}
+	}
+
+	@RpcMethod
+	public void notifyKvStateRegistered(
+		final JobVertexID jobVertexId,
+		final KeyGroupRange keyGroupRange,
+		final String registrationName,
+		final KvStateID kvStateId,
+		final KvStateServerAddress kvStateServerAddress)
+	{
+		if (executionGraph != null) {
+			if (log.isDebugEnabled()) {
+				log.debug("Key value state registered for job {} under name {}.",
+					jobGraph.getJobID(), registrationName);
+			}
+			try {
+				executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
+					jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress
+				);
+			} catch (Exception e) {
+				log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
+			}
+		} else {
+			log.error("Received notify KvState registered request for unavailable job " + jobGraph.getJobID());
+		}
+	}
+
+	@RpcMethod
+	public void notifyKvStateUnregistered(
+		JobVertexID jobVertexId,
+		KeyGroupRange keyGroupRange,
+		String registrationName)
+	{
+		if (executionGraph != null) {
+			if (log.isDebugEnabled()) {
+				log.debug("Key value state unregistered for job {} under name {}.",
+					jobGraph.getJobID(), registrationName);
+			}
+			try {
+				executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
+					jobVertexId, keyGroupRange, registrationName
+				);
+			} catch (Exception e) {
+				log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
+			}
+		} else {
+			log.error("Received notify KvState unregistered request for unavailable job " + jobGraph.getJobID());
+		}
+	}
+
+	@RpcMethod
+	public Future<TriggerSavepointResponse> triggerSavepoint() throws Exception {
+		if (executionGraph != null) {
+			final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+			if (checkpointCoordinator != null) {
+				try {
+					Future<String> savepointFuture = new FlinkFuture<>(
+						checkpointCoordinator.triggerSavepoint(System.currentTimeMillis()));
+
+					return savepointFuture.handleAsync(new BiFunction<String, Throwable, TriggerSavepointResponse>() {
+						@Override
+						public TriggerSavepointResponse apply(String savepointPath, Throwable throwable) {
+							if (throwable == null) {
+								return new TriggerSavepointResponse.Success(jobGraph.getJobID(), savepointPath);
+							}
+							else {
+								return new TriggerSavepointResponse.Failure(jobGraph.getJobID(),
+									new Exception("Failed to complete savepoint", throwable));
+							}
+						}
+					}, getMainThreadExecutor());
+
+				} catch (Exception e) {
+					FlinkCompletableFuture<TriggerSavepointResponse> future = new FlinkCompletableFuture<>();
+					future.complete(new TriggerSavepointResponse.Failure(jobGraph.getJobID(),
+						new Exception("Failed to trigger savepoint", e)));
+					return future;
+				}
+			} else {
+				FlinkCompletableFuture<TriggerSavepointResponse> future = new FlinkCompletableFuture<>();
+				future.complete(new TriggerSavepointResponse.Failure(jobGraph.getJobID(),
+					new IllegalStateException("Checkpointing disabled. You can enable it via the execution " +
+						"environment of your job.")));
+				return future;
+			}
+		} else {
+			FlinkCompletableFuture<TriggerSavepointResponse> future = new FlinkCompletableFuture<>();
+			future.complete(new TriggerSavepointResponse.Failure(jobGraph.getJobID(),
+				new IllegalArgumentException("Received trigger savepoint request for unavailable job " +
+					jobGraph.getJobID())));
+			return future;
+		}
+	}
+
+	@RpcMethod
+	public DisposeSavepointResponse disposeSavepoint(final String savepointPath) {
+		try {
+			log.info("Disposing savepoint at {}.", savepointPath);
+
+			// check whether the savepoint exists
+			savepointStore.loadSavepoint(savepointPath);
+
+			savepointStore.disposeSavepoint(savepointPath);
+			return new DisposeSavepointResponse.Success();
+		} catch (Exception e) {
+			log.error("Failed to dispose savepoint at {}.", savepointPath, e);
+			return new DisposeSavepointResponse.Failure(e);
+		}
+	}
+
+	@RpcMethod
+	public ClassloadingProps requestClassloadingProps() throws Exception {
+		if (executionGraph != null) {
+			return new ClassloadingProps(libraryCacheManager.getBlobServerPort(),
+				executionGraph.getRequiredJarFiles(),
+				executionGraph.getRequiredClasspaths());
+		} else {
+			throw new Exception("Received classloading props request for unavailable job " + jobGraph.getJobID());
+		}
+	}
+
 	//----------------------------------------------------------------------------------------------
 	// Internal methods
 	//----------------------------------------------------------------------------------------------
 
+	private void handleFatalError(final Throwable cause) {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
+				shutDown();
+				jobCompletionActions.onFatalError(cause);
+			}
+		});
+	}
+
 	// TODO - wrap this as StatusListenerMessenger's callback with rpc main thread
 	private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
 		final JobID jobID = executionGraph.getJobID();

http://git-wip-us.apache.org/repos/asf/flink/blob/21b9f16b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index e3e57d4..4b51258 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -30,8 +28,18 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
+import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse;
+import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
+import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 
 import java.util.UUID;
@@ -110,4 +118,81 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * @param resourceID identifying the TaskManager to disconnect
 	 */
 	void disconnectTaskManager(ResourceID resourceID);
+	void scheduleOrUpdateConsumers(final ResultPartitionID partitionID);
+
+	/**
+	 * Notifies the JobManager about the removal of a resource.
+	 *
+	 * @param resourceId The ID under which the resource is registered.
+	 * @param message    Optional message with details, for logging and debugging.
+	 */
+
+	void resourceRemoved(final ResourceID resourceId, final String message);
+
+	/**
+	 * Notifies the JobManager that the checkpoint of an individual task is completed.
+	 *
+	 * @param acknowledge The acknowledge message of the checkpoint
+	 */
+	void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge);
+
+	/**
+	 * Notifies the JobManager that a checkpoint request could not be heeded.
+	 * This can happen if a Task is already in RUNNING state but is internally not yet ready to perform checkpoints.
+	 *
+	 * @param decline The decline message of the checkpoint
+	 */
+	void declineCheckpoint(final DeclineCheckpoint decline);
+
+	/**
+	 * Requests a {@link KvStateLocation} for the specified {@link KvState} registration name.
+	 *
+	 * @param registrationName Name under which the KvState has been registered.
+	 * @return Future of the requested {@link KvState} location
+	 */
+	Future<KvStateLocation> lookupKvStateLocation(final String registrationName) throws Exception;
+
+	/**
+	 * @param jobVertexId          JobVertexID the KvState instance belongs to.
+	 * @param keyGroupRange        Key group range the KvState instance belongs to.
+	 * @param registrationName     Name under which the KvState has been registered.
+	 * @param kvStateId            ID of the registered KvState instance.
+	 * @param kvStateServerAddress Server address where to find the KvState instance.
+	 */
+	void notifyKvStateRegistered(
+		final JobVertexID jobVertexId,
+		final KeyGroupRange keyGroupRange,
+		final String registrationName,
+		final KvStateID kvStateId,
+		final KvStateServerAddress kvStateServerAddress);
+
+	/**
+	 * @param jobVertexId      JobVertexID the KvState instance belongs to.
+	 * @param keyGroupRange    Key group index the KvState instance belongs to.
+	 * @param registrationName Name under which the KvState has been registered.
+	 */
+	void notifyKvStateUnregistered(
+		JobVertexID jobVertexId,
+		KeyGroupRange keyGroupRange,
+		String registrationName);
+
+	/**
+	 * Notifies the JobManager to trigger a savepoint for this job.
+	 *
+	 * @return Future of the savepoint trigger response.
+	 */
+	Future<TriggerSavepointResponse> triggerSavepoint();
+
+	/**
+	 * Notifies the Jobmanager to dispose specified savepoint.
+	 *
+	 * @param savepointPath The path of the savepoint.
+	 * @return The future of the savepoint disponse response.
+	 */
+	Future<DisposeSavepointResponse> disposeSavepoint(final String savepointPath);
+
+	/**
+	 * Request the classloading props of this job.
+	 */
+	Future<ClassloadingProps> requestClassloadingProps();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21b9f16b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
new file mode 100644
index 0000000..2d670b4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.message;
+
+import org.apache.flink.runtime.blob.BlobKey;
+
+import java.io.Serializable;
+import java.net.URL;
+import java.util.List;
+
+/**
+ * The response of classloading props request to JobManager.
+ */
+public class ClassloadingProps implements Serializable {
+
+	private static final long serialVersionUID = -3282341310808511823L;
+
+	private final int blobManagerPort;
+
+	private final List<BlobKey> requiredJarFiles;
+
+	private final List<URL> requiredClasspaths;
+
+	/**
+	 * Constructor of ClassloadingProps.
+	 *
+	 * @param blobManagerPort    The port of the blobManager
+	 * @param requiredJarFiles   The blob keys of the required jar files
+	 * @param requiredClasspaths The urls of the required classpaths
+	 */
+	public ClassloadingProps(
+		final int blobManagerPort,
+		final List<BlobKey> requiredJarFiles,
+		final List<URL> requiredClasspaths)
+	{
+		this.blobManagerPort = blobManagerPort;
+		this.requiredJarFiles = requiredJarFiles;
+		this.requiredClasspaths = requiredClasspaths;
+	}
+
+	public int getBlobManagerPort() {
+		return blobManagerPort;
+	}
+
+	public List<BlobKey> getRequiredJarFiles() {
+		return requiredJarFiles;
+	}
+
+	public List<URL> getRequiredClasspaths() {
+		return requiredClasspaths;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21b9f16b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
new file mode 100644
index 0000000..42bfc71
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.message;
+
+import java.io.Serializable;
+
+/**
+ * The response of the dispose savepoint request to JobManager.
+ */
+public abstract class DisposeSavepointResponse implements Serializable {
+
+	private static final long serialVersionUID = 6008792963949369567L;
+
+	public static class Success extends DisposeSavepointResponse implements Serializable {
+
+		private static final long serialVersionUID = 1572462960008711415L;
+	}
+
+	public static class Failure extends DisposeSavepointResponse implements Serializable {
+
+		private static final long serialVersionUID = -7505308325483022458L;
+
+		private final Throwable cause;
+
+		public Failure(final Throwable cause) {
+			this.cause = cause;
+		}
+
+		public Throwable getCause() {
+			return cause;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21b9f16b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
new file mode 100644
index 0000000..0b0edc5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.message;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.Serializable;
+
+/**
+ * The response of the trigger savepoint request to JobManager.
+ */
+public abstract class TriggerSavepointResponse implements Serializable {
+
+	private static final long serialVersionUID = 3139327824611807707L;
+
+	private final JobID jobID;
+
+	public JobID getJobID() {
+		return jobID;
+	}
+
+	public TriggerSavepointResponse(final JobID jobID) {
+		this.jobID = jobID;
+	}
+
+	public static class Success extends TriggerSavepointResponse implements Serializable {
+
+		private static final long serialVersionUID = -1100637460388881776L;
+
+		private final String savepointPath;
+
+		public Success(final JobID jobID, final String savepointPath) {
+			super(jobID);
+			this.savepointPath = savepointPath;
+		}
+
+		public String getSavepointPath() {
+			return savepointPath;
+		}
+	}
+
+	public static class Failure extends TriggerSavepointResponse implements Serializable {
+
+		private static final long serialVersionUID = -1668479003490615139L;
+
+		private final Throwable cause;
+
+		public Failure(final JobID jobID, final Throwable cause) {
+			super(jobID);
+			this.cause = cause;
+		}
+
+		public Throwable getCause() {
+			return cause;
+		}
+	}
+}
+