You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/02 20:29:45 UTC

[4/5] flink git commit: [FLINK-7708] [flip6] Add CheckpointConfigHandler for new REST endpoint

[FLINK-7708] [flip6] Add CheckpointConfigHandler for new REST endpoint

This commit implements the CheckpointConfigHandler which now returns a
CheckpointConfigInfo object if checkpointing is enabled. In case that
checkpointing is disabled for a job, it will return a 404 response.

This closes #4744.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b41f5a66
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b41f5a66
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b41f5a66

Branch: refs/heads/master
Commit: b41f5a66cd6d62bf3c271f1d0bf9d8fa50a5d410
Parents: 172a64c
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Sep 28 18:35:50 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Oct 2 19:58:59 2017 +0200

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      |  11 ++
 .../job/AbstractExecutionGraphHandler.java      |  11 +-
 .../checkpoints/CheckpointConfigHandler.java    |  85 +++++++++++
 .../checkpoints/CheckpointConfigHandler.java    |  19 +--
 .../rest/messages/CheckpointConfigHeaders.java  |  70 +++++++++
 .../rest/messages/CheckpointConfigInfo.java     | 151 +++++++++++++++++++
 .../messages/CheckpointConfigInfoTest.java      |  45 ++++++
 7 files changed, 381 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b41f5a66/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 150490d..7d7e32c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
 import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
@@ -39,6 +40,7 @@ import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpeci
 import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfo;
 import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
 import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
+import org.apache.flink.runtime.rest.messages.CheckpointConfigHeaders;
 import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
 import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
@@ -148,6 +150,14 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			executionGraphCache,
 			executor);
 
+		CheckpointConfigHandler checkpointConfigHandler = new CheckpointConfigHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			CheckpointConfigHeaders.getInstance(),
+			executionGraphCache,
+			executor);
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -169,6 +179,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(CurrentJobsOverviewHandlerHeaders.getInstance(), currentJobsOverviewHandler));
 		handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
 		handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
+		handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
 
 		// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
 		optWebContent.ifPresent(

http://git-wip-us.apache.org/repos/asf/flink/blob/b41f5a66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
index efe3758..f2b1ac8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
@@ -37,6 +37,7 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nonnull;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -70,9 +71,15 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> exte
 		CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, gateway);
 
 		return executionGraphFuture.thenApplyAsync(
-			this::handleRequest,
+			executionGraph -> {
+				try {
+					return handleRequest(executionGraph);
+				} catch (RestHandlerException rhe) {
+					throw new CompletionException(rhe);
+				}
+			},
 			executor);
 	}
 
-	protected abstract R handleRequest(AccessExecutionGraph executionGraph);
+	protected abstract R handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b41f5a66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
new file mode 100644
index 0000000..94646eb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler which serves the checkpoint configuration.
+ */
+public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<CheckpointConfigInfo> {
+
+	public CheckpointConfigHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			MessageHeaders<EmptyRequestBody, CheckpointConfigInfo, JobMessageParameters> messageHeaders,
+			ExecutionGraphCache executionGraphCache,
+			Executor executor) {
+		super(
+			localRestAddress,
+			leaderRetriever,
+			timeout,
+			messageHeaders,
+			executionGraphCache,
+			executor);
+	}
+
+	@Override
+	protected CheckpointConfigInfo handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException {
+		final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = executionGraph.getCheckpointCoordinatorConfiguration();
+
+		if (checkpointCoordinatorConfiguration == null) {
+			throw new RestHandlerException(
+				"Checkpointing is not enabled for this job (" + executionGraph.getJobID() + ").",
+				HttpResponseStatus.NOT_FOUND);
+		} else {
+			ExternalizedCheckpointSettings externalizedCheckpointSettings = checkpointCoordinatorConfiguration.getExternalizedCheckpointSettings();
+
+			CheckpointConfigInfo.ExternalizedCheckpointInfo externalizedCheckpointInfo = new CheckpointConfigInfo.ExternalizedCheckpointInfo(
+				externalizedCheckpointSettings.externalizeCheckpoints(),
+				externalizedCheckpointSettings.deleteOnCancellation());
+
+			return new CheckpointConfigInfo(
+				checkpointCoordinatorConfiguration.isExactlyOnce() ? CheckpointConfigInfo.ProcessingMode.EXACTLY_ONCE : CheckpointConfigInfo.ProcessingMode.AT_LEAST_ONCE,
+				checkpointCoordinatorConfiguration.getCheckpointInterval(),
+				checkpointCoordinatorConfiguration.getCheckpointTimeout(),
+				checkpointCoordinatorConfiguration.getMinPauseBetweenCheckpoints(),
+				checkpointCoordinatorConfiguration.getMaxConcurrentCheckpoints(),
+				externalizedCheckpointInfo);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b41f5a66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
index 69a59f5..f50c42d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.FlinkException;
@@ -93,20 +94,20 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
 
 		gen.writeStartObject();
 		{
-			gen.writeStringField("mode", jobCheckpointingConfiguration.isExactlyOnce() ? "exactly_once" : "at_least_once");
-			gen.writeNumberField("interval", jobCheckpointingConfiguration.getCheckpointInterval());
-			gen.writeNumberField("timeout", jobCheckpointingConfiguration.getCheckpointTimeout());
-			gen.writeNumberField("min_pause", jobCheckpointingConfiguration.getMinPauseBetweenCheckpoints());
-			gen.writeNumberField("max_concurrent", jobCheckpointingConfiguration.getMaxConcurrentCheckpoints());
+			gen.writeStringField(CheckpointConfigInfo.FIELD_NAME_PROCESSING_MODE, jobCheckpointingConfiguration.isExactlyOnce() ? "exactly_once" : "at_least_once");
+			gen.writeNumberField(CheckpointConfigInfo.FIELD_NAME_CHECKPOINT_INTERVAL, jobCheckpointingConfiguration.getCheckpointInterval());
+			gen.writeNumberField(CheckpointConfigInfo.FIELD_NAME_CHECKPOINT_TIMEOUT, jobCheckpointingConfiguration.getCheckpointTimeout());
+			gen.writeNumberField(CheckpointConfigInfo.FIELD_NAME_CHECKPOINT_MIN_PAUSE, jobCheckpointingConfiguration.getMinPauseBetweenCheckpoints());
+			gen.writeNumberField(CheckpointConfigInfo.FIELD_NAME_CHECKPOINT_MAX_CONCURRENT, jobCheckpointingConfiguration.getMaxConcurrentCheckpoints());
 
 			ExternalizedCheckpointSettings externalization = jobCheckpointingConfiguration.getExternalizedCheckpointSettings();
-			gen.writeObjectFieldStart("externalization");
+			gen.writeObjectFieldStart(CheckpointConfigInfo.FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG);
 			{
 				if (externalization.externalizeCheckpoints()) {
-					gen.writeBooleanField("enabled", true);
-					gen.writeBooleanField("delete_on_cancellation", externalization.deleteOnCancellation());
+					gen.writeBooleanField(CheckpointConfigInfo.ExternalizedCheckpointInfo.FIELD_NAME_ENABLED, true);
+					gen.writeBooleanField(CheckpointConfigInfo.ExternalizedCheckpointInfo.FIELD_NAME_DELETE_ON_CANCELLATION, externalization.deleteOnCancellation());
 				} else {
-					gen.writeBooleanField("enabled", false);
+					gen.writeBooleanField(CheckpointConfigInfo.ExternalizedCheckpointInfo.FIELD_NAME_ENABLED, false);
 				}
 			}
 			gen.writeEndObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/b41f5a66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java
new file mode 100644
index 0000000..bfc0b7a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link CheckpointConfigHandler}.
+ */
+public class CheckpointConfigHeaders implements MessageHeaders<EmptyRequestBody, CheckpointConfigInfo, JobMessageParameters> {
+
+	private static final CheckpointConfigHeaders INSTANCE = new CheckpointConfigHeaders();
+
+	public static final String URL = "/jobs/:jobid/checkpoints/config";
+
+	private CheckpointConfigHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<CheckpointConfigInfo> getResponseClass() {
+		return CheckpointConfigInfo.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public JobMessageParameters getUnresolvedMessageParameters() {
+		return new JobMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static CheckpointConfigHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b41f5a66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java
new file mode 100644
index 0000000..fbda12a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Response class of the {@link CheckpointConfigHandler}.
+ */
+public class CheckpointConfigInfo implements ResponseBody {
+
+	public static final String FIELD_NAME_PROCESSING_MODE = "mode";
+
+	public static final String FIELD_NAME_CHECKPOINT_INTERVAL = "interval";
+
+	public static final String FIELD_NAME_CHECKPOINT_TIMEOUT = "timeout";
+
+	public static final String FIELD_NAME_CHECKPOINT_MIN_PAUSE = "min_pause";
+
+	public static final String FIELD_NAME_CHECKPOINT_MAX_CONCURRENT = "max_concurrent";
+
+	public static final String FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG = "externalization";
+
+	@JsonProperty(FIELD_NAME_PROCESSING_MODE)
+	private final ProcessingMode processingMode;
+
+	@JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL)
+	private final long checkpointInterval;
+
+	@JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT)
+	private final long checkpointTimeout;
+
+	@JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE)
+	private final long minPauseBetweenCheckpoints;
+
+	@JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT)
+	private final long maxConcurrentCheckpoints;
+
+	@JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG)
+	private final ExternalizedCheckpointInfo externalizedCheckpointInfo;
+
+	@JsonCreator
+	public CheckpointConfigInfo(
+			@JsonProperty(FIELD_NAME_PROCESSING_MODE) ProcessingMode processingMode,
+			@JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL) long checkpointInterval,
+			@JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT) long checkpointTimeout,
+			@JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE) long minPauseBetweenCheckpoints,
+			@JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT) int maxConcurrentCheckpoints,
+			@JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG) ExternalizedCheckpointInfo externalizedCheckpointInfo) {
+		this.processingMode = Preconditions.checkNotNull(processingMode);
+		this.checkpointInterval = checkpointInterval;
+		this.checkpointTimeout = checkpointTimeout;
+		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+		this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+		this.externalizedCheckpointInfo = Preconditions.checkNotNull(externalizedCheckpointInfo);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		CheckpointConfigInfo that = (CheckpointConfigInfo) o;
+		return checkpointInterval == that.checkpointInterval &&
+			checkpointTimeout == that.checkpointTimeout &&
+			minPauseBetweenCheckpoints == that.minPauseBetweenCheckpoints &&
+			maxConcurrentCheckpoints == that.maxConcurrentCheckpoints &&
+			processingMode == that.processingMode &&
+			Objects.equals(externalizedCheckpointInfo, that.externalizedCheckpointInfo);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(processingMode, checkpointInterval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, externalizedCheckpointInfo);
+	}
+
+	/**
+	 * Contains information about the externalized checkpoint configuration.
+	 */
+	public static final class ExternalizedCheckpointInfo {
+
+		public static final String FIELD_NAME_ENABLED = "enabled";
+
+		public static final String FIELD_NAME_DELETE_ON_CANCELLATION = "delete_on_cancellation";
+
+		@JsonProperty(FIELD_NAME_ENABLED)
+		private final boolean enabled;
+
+		@JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION)
+		private final boolean deleteOnCancellation;
+
+		@JsonCreator
+		public ExternalizedCheckpointInfo(
+				@JsonProperty(FIELD_NAME_ENABLED) boolean enabled,
+				@JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION) boolean deleteOnCancellation) {
+			this.enabled = enabled;
+			this.deleteOnCancellation = deleteOnCancellation;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			ExternalizedCheckpointInfo that = (ExternalizedCheckpointInfo) o;
+			return enabled == that.enabled &&
+				deleteOnCancellation == that.deleteOnCancellation;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(enabled, deleteOnCancellation);
+		}
+	}
+
+	/**
+	 * Processing mode.
+	 */
+	public enum ProcessingMode {
+		AT_LEAST_ONCE,
+		EXACTLY_ONCE
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b41f5a66/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java
new file mode 100644
index 0000000..709c21d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo;
+
+/**
+ * Tests for the {@link CheckpointConfigInfo}.
+ */
+public class CheckpointConfigInfoTest extends RestResponseMarshallingTestBase<CheckpointConfigInfo> {
+	@Override
+	protected Class<CheckpointConfigInfo> getTestResponseClass() {
+		return CheckpointConfigInfo.class;
+	}
+
+	@Override
+	protected CheckpointConfigInfo getTestResponseInstance() {
+		final CheckpointConfigInfo.ExternalizedCheckpointInfo externalizedCheckpointInfo = new CheckpointConfigInfo.ExternalizedCheckpointInfo(true, false);
+
+		return new CheckpointConfigInfo(
+			CheckpointConfigInfo.ProcessingMode.AT_LEAST_ONCE,
+			1L,
+			2L,
+			3L,
+			4,
+			externalizedCheckpointInfo);
+
+	}
+}