You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/01 08:40:53 UTC
[26/50] [abbrv] flink git commit: [FLINK-4735] [cluster management]
Implements some job execution related RPC calls on the JobManager
[FLINK-4735] [cluster management] Implements some job execution related RPC calls on the JobManager
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94a0646c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94a0646c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94a0646c
Branch: refs/heads/flip-6
Commit: 94a0646cbbf86f3aacd979570c979c0401004be2
Parents: bf4d384
Author: Kurt Young <yk...@gmail.com>
Authored: Tue Oct 4 23:00:22 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 1 09:39:31 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/jobmaster/JobMaster.java | 246 +++++++++++++++++--
.../runtime/jobmaster/JobMasterGateway.java | 93 ++++++-
.../jobmaster/message/ClassloadingProps.java | 68 +++++
.../message/DisposeSavepointResponse.java | 49 ++++
.../message/TriggerSavepointResponse.java | 74 ++++++
5 files changed, 507 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/94a0646c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 8f3a342..3b8fc97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -39,8 +40,11 @@ import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -61,10 +65,20 @@ import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
+import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse;
+import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
+import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
@@ -72,7 +86,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.state.CheckpointStateHandles;
+import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.InstantiationUtil;
@@ -520,22 +534,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
throw new ExecutionGraphException("The execution attempt " +
taskExecutionState.getID() + " was not found.");
}
-
- }
-
- //----------------------------------------------------------------------------------------------\u2028
- // Internal methods\u2028
- // ----------------------------------------------------------------------------------------------\u2028\u2028
-
- private void handleFatalError(final Throwable cause) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
- shutDown();
- jobCompletionActions.onFatalError(cause);
- }
- });
}
@RpcMethod
@@ -631,10 +629,220 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
throw new UnsupportedOperationException();
}
+ @RpcMethod
+ public void resourceRemoved(final ResourceID resourceId, final String message) {
+ // TODO: remove resource from slot pool
+ }
+
+ @RpcMethod
+ public void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge) {
+ if (executionGraph != null) {
+ final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+ if (checkpointCoordinator != null) {
+ getRpcService().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (!checkpointCoordinator.receiveAcknowledgeMessage(acknowledge)) {
+ log.info("Received message for non-existing checkpoint {}.",
+ acknowledge.getCheckpointId());
+ }
+ } catch (Exception e) {
+ log.error("Error in CheckpointCoordinator while processing {}", acknowledge, e);
+ }
+ }
+ });
+ }
+ else {
+ log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator",
+ jobGraph.getJobID());
+ }
+ } else {
+ log.error("Received AcknowledgeCheckpoint for unavailable job {}", jobGraph.getJobID());
+ }
+ }
+
+ @RpcMethod
+ public void declineCheckpoint(final DeclineCheckpoint decline) {
+ if (executionGraph != null) {
+ final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+ if (checkpointCoordinator != null) {
+ getRpcService().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (!checkpointCoordinator.receiveDeclineMessage(decline)) {
+ log.info("Received message for non-existing checkpoint {}.", decline.getCheckpointId());
+ }
+ } catch (Exception e) {
+ log.error("Error in CheckpointCoordinator while processing {}", decline, e);
+ }
+ }
+ });
+ } else {
+ log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator",
+ jobGraph.getJobID());
+ }
+ } else {
+ log.error("Received AcknowledgeCheckpoint for unavailable job {}", jobGraph.getJobID());
+ }
+ }
+
+ @RpcMethod
+ public KvStateLocation lookupKvStateLocation(final String registrationName) throws Exception {
+ if (executionGraph != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Lookup key-value state for job {} with registration " +
+ "name {}.", jobGraph.getJobID(), registrationName);
+ }
+
+ final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry();
+ final KvStateLocation location = registry.getKvStateLocation(registrationName);
+ if (location != null) {
+ return location;
+ } else {
+ throw new UnknownKvStateLocation(registrationName);
+ }
+ } else {
+ throw new IllegalStateException("Received lookup KvState location request for unavailable job " +
+ jobGraph.getJobID());
+ }
+ }
+
+ @RpcMethod
+ public void notifyKvStateRegistered(
+ final JobVertexID jobVertexId,
+ final KeyGroupRange keyGroupRange,
+ final String registrationName,
+ final KvStateID kvStateId,
+ final KvStateServerAddress kvStateServerAddress)
+ {
+ if (executionGraph != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Key value state registered for job {} under name {}.",
+ jobGraph.getJobID(), registrationName);
+ }
+ try {
+ executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
+ jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress
+ );
+ } catch (Exception e) {
+ log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
+ }
+ } else {
+ log.error("Received notify KvState registered request for unavailable job " + jobGraph.getJobID());
+ }
+ }
+
+ @RpcMethod
+ public void notifyKvStateUnregistered(
+ JobVertexID jobVertexId,
+ KeyGroupRange keyGroupRange,
+ String registrationName)
+ {
+ if (executionGraph != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Key value state unregistered for job {} under name {}.",
+ jobGraph.getJobID(), registrationName);
+ }
+ try {
+ executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
+ jobVertexId, keyGroupRange, registrationName
+ );
+ } catch (Exception e) {
+ log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
+ }
+ } else {
+ log.error("Received notify KvState unregistered request for unavailable job " + jobGraph.getJobID());
+ }
+ }
+
+ @RpcMethod
+ public Future<TriggerSavepointResponse> triggerSavepoint() throws Exception {
+ if (executionGraph != null) {
+ final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+ if (checkpointCoordinator != null) {
+ try {
+ Future<String> savepointFuture = new FlinkFuture<>(
+ checkpointCoordinator.triggerSavepoint(System.currentTimeMillis()));
+
+ return savepointFuture.handleAsync(new BiFunction<String, Throwable, TriggerSavepointResponse>() {
+ @Override
+ public TriggerSavepointResponse apply(String savepointPath, Throwable throwable) {
+ if (throwable == null) {
+ return new TriggerSavepointResponse.Success(jobGraph.getJobID(), savepointPath);
+ }
+ else {
+ return new TriggerSavepointResponse.Failure(jobGraph.getJobID(),
+ new Exception("Failed to complete savepoint", throwable));
+ }
+ }
+ }, getMainThreadExecutor());
+
+ } catch (Exception e) {
+ FlinkCompletableFuture<TriggerSavepointResponse> future = new FlinkCompletableFuture<>();
+ future.complete(new TriggerSavepointResponse.Failure(jobGraph.getJobID(),
+ new Exception("Failed to trigger savepoint", e)));
+ return future;
+ }
+ } else {
+ FlinkCompletableFuture<TriggerSavepointResponse> future = new FlinkCompletableFuture<>();
+ future.complete(new TriggerSavepointResponse.Failure(jobGraph.getJobID(),
+ new IllegalStateException("Checkpointing disabled. You can enable it via the execution " +
+ "environment of your job.")));
+ return future;
+ }
+ } else {
+ FlinkCompletableFuture<TriggerSavepointResponse> future = new FlinkCompletableFuture<>();
+ future.complete(new TriggerSavepointResponse.Failure(jobGraph.getJobID(),
+ new IllegalArgumentException("Received trigger savepoint request for unavailable job " +
+ jobGraph.getJobID())));
+ return future;
+ }
+ }
+
+ @RpcMethod
+ public DisposeSavepointResponse disposeSavepoint(final String savepointPath) {
+ try {
+ log.info("Disposing savepoint at {}.", savepointPath);
+
+ // check whether the savepoint exists
+ savepointStore.loadSavepoint(savepointPath);
+
+ savepointStore.disposeSavepoint(savepointPath);
+ return new DisposeSavepointResponse.Success();
+ } catch (Exception e) {
+ log.error("Failed to dispose savepoint at {}.", savepointPath, e);
+ return new DisposeSavepointResponse.Failure(e);
+ }
+ }
+
+ @RpcMethod
+ public ClassloadingProps requestClassloadingProps() throws Exception {
+ if (executionGraph != null) {
+ return new ClassloadingProps(libraryCacheManager.getBlobServerPort(),
+ executionGraph.getRequiredJarFiles(),
+ executionGraph.getRequiredClasspaths());
+ } else {
+ throw new Exception("Received classloading props request for unavailable job " + jobGraph.getJobID());
+ }
+ }
+
//----------------------------------------------------------------------------------------------
// Internal methods
//----------------------------------------------------------------------------------------------
+ private void handleFatalError(final Throwable cause) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
+ shutDown();
+ jobCompletionActions.onFatalError(cause);
+ }
+ });
+ }
+
// TODO - wrap this as StatusListenerMessenger's callback with rpc main thread
private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
final JobID jobID = executionGraph.getJobID();
http://git-wip-us.apache.org/repos/asf/flink/blob/94a0646c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index e3e57d4..4b51258 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.jobmaster;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -30,8 +28,18 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
+import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse;
+import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
+import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import java.util.UUID;
@@ -110,4 +118,81 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
* @param resourceID identifying the TaskManager to disconnect
*/
void disconnectTaskManager(ResourceID resourceID);
+ void scheduleOrUpdateConsumers(final ResultPartitionID partitionID);
+
+ /**
+ * Notifies the JobManager about the removal of a resource.
+ *
+ * @param resourceId The ID under which the resource is registered.
+ * @param message Optional message with details, for logging and debugging.
+ */
+
+ void resourceRemoved(final ResourceID resourceId, final String message);
+
+ /**
+ * Notifies the JobManager that the checkpoint of an individual task is completed.
+ *
+ * @param acknowledge The acknowledge message of the checkpoint
+ */
+ void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge);
+
+ /**
+ * Notifies the JobManager that a checkpoint request could not be heeded.
+ * This can happen if a Task is already in RUNNING state but is internally not yet ready to perform checkpoints.
+ *
+ * @param decline The decline message of the checkpoint
+ */
+ void declineCheckpoint(final DeclineCheckpoint decline);
+
+ /**
+ * Requests a {@link KvStateLocation} for the specified {@link KvState} registration name.
+ *
+ * @param registrationName Name under which the KvState has been registered.
+ * @return Future of the requested {@link KvState} location
+ */
+ Future<KvStateLocation> lookupKvStateLocation(final String registrationName) throws Exception;
+
+ /**
+ * @param jobVertexId JobVertexID the KvState instance belongs to.
+ * @param keyGroupRange Key group range the KvState instance belongs to.
+ * @param registrationName Name under which the KvState has been registered.
+ * @param kvStateId ID of the registered KvState instance.
+ * @param kvStateServerAddress Server address where to find the KvState instance.
+ */
+ void notifyKvStateRegistered(
+ final JobVertexID jobVertexId,
+ final KeyGroupRange keyGroupRange,
+ final String registrationName,
+ final KvStateID kvStateId,
+ final KvStateServerAddress kvStateServerAddress);
+
+ /**
+ * @param jobVertexId JobVertexID the KvState instance belongs to.
+ * @param keyGroupRange Key group index the KvState instance belongs to.
+ * @param registrationName Name under which the KvState has been registered.
+ */
+ void notifyKvStateUnregistered(
+ JobVertexID jobVertexId,
+ KeyGroupRange keyGroupRange,
+ String registrationName);
+
+ /**
+ * Notifies the JobManager to trigger a savepoint for this job.
+ *
+ * @return Future of the savepoint trigger response.
+ */
+ Future<TriggerSavepointResponse> triggerSavepoint();
+
+ /**
+ * Notifies the Jobmanager to dispose specified savepoint.
+ *
+ * @param savepointPath The path of the savepoint.
+ * @return The future of the savepoint disponse response.
+ */
+ Future<DisposeSavepointResponse> disposeSavepoint(final String savepointPath);
+
+ /**
+ * Request the classloading props of this job.
+ */
+ Future<ClassloadingProps> requestClassloadingProps();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/94a0646c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
new file mode 100644
index 0000000..2d670b4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.message;
+
+import org.apache.flink.runtime.blob.BlobKey;
+
+import java.io.Serializable;
+import java.net.URL;
+import java.util.List;
+
+/**
+ * The response of classloading props request to JobManager.
+ */
+public class ClassloadingProps implements Serializable {
+
+ private static final long serialVersionUID = -3282341310808511823L;
+
+ private final int blobManagerPort;
+
+ private final List<BlobKey> requiredJarFiles;
+
+ private final List<URL> requiredClasspaths;
+
+ /**
+ * Constructor of ClassloadingProps.
+ *
+ * @param blobManagerPort The port of the blobManager
+ * @param requiredJarFiles The blob keys of the required jar files
+ * @param requiredClasspaths The urls of the required classpaths
+ */
+ public ClassloadingProps(
+ final int blobManagerPort,
+ final List<BlobKey> requiredJarFiles,
+ final List<URL> requiredClasspaths)
+ {
+ this.blobManagerPort = blobManagerPort;
+ this.requiredJarFiles = requiredJarFiles;
+ this.requiredClasspaths = requiredClasspaths;
+ }
+
+ public int getBlobManagerPort() {
+ return blobManagerPort;
+ }
+
+ public List<BlobKey> getRequiredJarFiles() {
+ return requiredJarFiles;
+ }
+
+ public List<URL> getRequiredClasspaths() {
+ return requiredClasspaths;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/94a0646c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
new file mode 100644
index 0000000..42bfc71
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.message;
+
+import java.io.Serializable;
+
+/**
+ * The response of the dispose savepoint request to JobManager.
+ */
+public abstract class DisposeSavepointResponse implements Serializable {
+
+ private static final long serialVersionUID = 6008792963949369567L;
+
+ public static class Success extends DisposeSavepointResponse implements Serializable {
+
+ private static final long serialVersionUID = 1572462960008711415L;
+ }
+
+ public static class Failure extends DisposeSavepointResponse implements Serializable {
+
+ private static final long serialVersionUID = -7505308325483022458L;
+
+ private final Throwable cause;
+
+ public Failure(final Throwable cause) {
+ this.cause = cause;
+ }
+
+ public Throwable getCause() {
+ return cause;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/94a0646c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
new file mode 100644
index 0000000..0b0edc5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.message;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.Serializable;
+
+/**
+ * The response of the trigger savepoint request to JobManager.
+ */
+public abstract class TriggerSavepointResponse implements Serializable {
+
+ private static final long serialVersionUID = 3139327824611807707L;
+
+ private final JobID jobID;
+
+ public JobID getJobID() {
+ return jobID;
+ }
+
+ public TriggerSavepointResponse(final JobID jobID) {
+ this.jobID = jobID;
+ }
+
+ public static class Success extends TriggerSavepointResponse implements Serializable {
+
+ private static final long serialVersionUID = -1100637460388881776L;
+
+ private final String savepointPath;
+
+ public Success(final JobID jobID, final String savepointPath) {
+ super(jobID);
+ this.savepointPath = savepointPath;
+ }
+
+ public String getSavepointPath() {
+ return savepointPath;
+ }
+ }
+
+ public static class Failure extends TriggerSavepointResponse implements Serializable {
+
+ private static final long serialVersionUID = -1668479003490615139L;
+
+ private final Throwable cause;
+
+ public Failure(final JobID jobID, final Throwable cause) {
+ super(jobID);
+ this.cause = cause;
+ }
+
+ public Throwable getCause() {
+ return cause;
+ }
+ }
+}
+