You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/02 20:29:45 UTC
[4/5] flink git commit: [FLINK-7708] [flip6] Add
CheckpointConfigHandler for new REST endpoint
[FLINK-7708] [flip6] Add CheckpointConfigHandler for new REST endpoint
This commit implements the CheckpointConfigHandler which now returns a
CheckpointConfigInfo object if checkpointing is enabled. In case that
checkpointing is disabled for a job, it will return a 404 response.
This closes #4744.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b41f5a66
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b41f5a66
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b41f5a66
Branch: refs/heads/master
Commit: b41f5a66cd6d62bf3c271f1d0bf9d8fa50a5d410
Parents: 172a64c
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Sep 28 18:35:50 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Oct 2 19:58:59 2017 +0200
----------------------------------------------------------------------
.../dispatcher/DispatcherRestEndpoint.java | 11 ++
.../job/AbstractExecutionGraphHandler.java | 11 +-
.../checkpoints/CheckpointConfigHandler.java | 85 +++++++++++
.../checkpoints/CheckpointConfigHandler.java | 19 +--
.../rest/messages/CheckpointConfigHeaders.java | 70 +++++++++
.../rest/messages/CheckpointConfigInfo.java | 151 +++++++++++++++++++
.../messages/CheckpointConfigInfoTest.java | 45 ++++++
7 files changed, 381 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b41f5a66/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 150490d..7d7e32c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
@@ -39,6 +40,7 @@ import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpeci
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfo;
import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
+import org.apache.flink.runtime.rest.messages.CheckpointConfigHeaders;
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
@@ -148,6 +150,14 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
executionGraphCache,
executor);
+ CheckpointConfigHandler checkpointConfigHandler = new CheckpointConfigHandler(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ CheckpointConfigHeaders.getInstance(),
+ executionGraphCache,
+ executor);
+
final File tmpDir = restConfiguration.getTmpDir();
Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -169,6 +179,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
handlers.add(Tuple2.of(CurrentJobsOverviewHandlerHeaders.getInstance(), currentJobsOverviewHandler));
handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
+ handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
optWebContent.ifPresent(
http://git-wip-us.apache.org/repos/asf/flink/blob/b41f5a66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
index efe3758..f2b1ac8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
@@ -37,6 +37,7 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -70,9 +71,15 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> exte
CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, gateway);
return executionGraphFuture.thenApplyAsync(
- this::handleRequest,
+ executionGraph -> {
+ try {
+ return handleRequest(executionGraph);
+ } catch (RestHandlerException rhe) {
+ throw new CompletionException(rhe);
+ }
+ },
executor);
}
- protected abstract R handleRequest(AccessExecutionGraph executionGraph);
+ protected abstract R handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b41f5a66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
new file mode 100644
index 0000000..94646eb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler which serves the checkpoint configuration.
+ */
+public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<CheckpointConfigInfo> {
+
+ public CheckpointConfigHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ MessageHeaders<EmptyRequestBody, CheckpointConfigInfo, JobMessageParameters> messageHeaders,
+ ExecutionGraphCache executionGraphCache,
+ Executor executor) {
+ super(
+ localRestAddress,
+ leaderRetriever,
+ timeout,
+ messageHeaders,
+ executionGraphCache,
+ executor);
+ }
+
+ @Override
+ protected CheckpointConfigInfo handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException {
+ final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = executionGraph.getCheckpointCoordinatorConfiguration();
+
+ if (checkpointCoordinatorConfiguration == null) {
+ throw new RestHandlerException(
+ "Checkpointing is not enabled for this job (" + executionGraph.getJobID() + ").",
+ HttpResponseStatus.NOT_FOUND);
+ } else {
+ ExternalizedCheckpointSettings externalizedCheckpointSettings = checkpointCoordinatorConfiguration.getExternalizedCheckpointSettings();
+
+ CheckpointConfigInfo.ExternalizedCheckpointInfo externalizedCheckpointInfo = new CheckpointConfigInfo.ExternalizedCheckpointInfo(
+ externalizedCheckpointSettings.externalizeCheckpoints(),
+ externalizedCheckpointSettings.deleteOnCancellation());
+
+ return new CheckpointConfigInfo(
+ checkpointCoordinatorConfiguration.isExactlyOnce() ? CheckpointConfigInfo.ProcessingMode.EXACTLY_ONCE : CheckpointConfigInfo.ProcessingMode.AT_LEAST_ONCE,
+ checkpointCoordinatorConfiguration.getCheckpointInterval(),
+ checkpointCoordinatorConfiguration.getCheckpointTimeout(),
+ checkpointCoordinatorConfiguration.getMinPauseBetweenCheckpoints(),
+ checkpointCoordinatorConfiguration.getMaxConcurrentCheckpoints(),
+ externalizedCheckpointInfo);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b41f5a66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
index 69a59f5..f50c42d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
@@ -93,20 +94,20 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
gen.writeStartObject();
{
- gen.writeStringField("mode", jobCheckpointingConfiguration.isExactlyOnce() ? "exactly_once" : "at_least_once");
- gen.writeNumberField("interval", jobCheckpointingConfiguration.getCheckpointInterval());
- gen.writeNumberField("timeout", jobCheckpointingConfiguration.getCheckpointTimeout());
- gen.writeNumberField("min_pause", jobCheckpointingConfiguration.getMinPauseBetweenCheckpoints());
- gen.writeNumberField("max_concurrent", jobCheckpointingConfiguration.getMaxConcurrentCheckpoints());
+ gen.writeStringField(CheckpointConfigInfo.FIELD_NAME_PROCESSING_MODE, jobCheckpointingConfiguration.isExactlyOnce() ? "exactly_once" : "at_least_once");
+ gen.writeNumberField(CheckpointConfigInfo.FIELD_NAME_CHECKPOINT_INTERVAL, jobCheckpointingConfiguration.getCheckpointInterval());
+ gen.writeNumberField(CheckpointConfigInfo.FIELD_NAME_CHECKPOINT_TIMEOUT, jobCheckpointingConfiguration.getCheckpointTimeout());
+ gen.writeNumberField(CheckpointConfigInfo.FIELD_NAME_CHECKPOINT_MIN_PAUSE, jobCheckpointingConfiguration.getMinPauseBetweenCheckpoints());
+ gen.writeNumberField(CheckpointConfigInfo.FIELD_NAME_CHECKPOINT_MAX_CONCURRENT, jobCheckpointingConfiguration.getMaxConcurrentCheckpoints());
ExternalizedCheckpointSettings externalization = jobCheckpointingConfiguration.getExternalizedCheckpointSettings();
- gen.writeObjectFieldStart("externalization");
+ gen.writeObjectFieldStart(CheckpointConfigInfo.FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG);
{
if (externalization.externalizeCheckpoints()) {
- gen.writeBooleanField("enabled", true);
- gen.writeBooleanField("delete_on_cancellation", externalization.deleteOnCancellation());
+ gen.writeBooleanField(CheckpointConfigInfo.ExternalizedCheckpointInfo.FIELD_NAME_ENABLED, true);
+ gen.writeBooleanField(CheckpointConfigInfo.ExternalizedCheckpointInfo.FIELD_NAME_DELETE_ON_CANCELLATION, externalization.deleteOnCancellation());
} else {
- gen.writeBooleanField("enabled", false);
+ gen.writeBooleanField(CheckpointConfigInfo.ExternalizedCheckpointInfo.FIELD_NAME_ENABLED, false);
}
}
gen.writeEndObject();
http://git-wip-us.apache.org/repos/asf/flink/blob/b41f5a66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java
new file mode 100644
index 0000000..bfc0b7a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link CheckpointConfigHandler}.
+ */
+public class CheckpointConfigHeaders implements MessageHeaders<EmptyRequestBody, CheckpointConfigInfo, JobMessageParameters> {
+
+ private static final CheckpointConfigHeaders INSTANCE = new CheckpointConfigHeaders();
+
+ public static final String URL = "/jobs/:jobid/checkpoints/config";
+
+ private CheckpointConfigHeaders() {}
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<CheckpointConfigInfo> getResponseClass() {
+ return CheckpointConfigInfo.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public JobMessageParameters getUnresolvedMessageParameters() {
+ return new JobMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ public static CheckpointConfigHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b41f5a66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java
new file mode 100644
index 0000000..fbda12a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Response class of the {@link CheckpointConfigHandler}.
+ */
+public class CheckpointConfigInfo implements ResponseBody {
+
+ public static final String FIELD_NAME_PROCESSING_MODE = "mode";
+
+ public static final String FIELD_NAME_CHECKPOINT_INTERVAL = "interval";
+
+ public static final String FIELD_NAME_CHECKPOINT_TIMEOUT = "timeout";
+
+ public static final String FIELD_NAME_CHECKPOINT_MIN_PAUSE = "min_pause";
+
+ public static final String FIELD_NAME_CHECKPOINT_MAX_CONCURRENT = "max_concurrent";
+
+ public static final String FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG = "externalization";
+
+ @JsonProperty(FIELD_NAME_PROCESSING_MODE)
+ private final ProcessingMode processingMode;
+
+ @JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL)
+ private final long checkpointInterval;
+
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT)
+ private final long checkpointTimeout;
+
+ @JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE)
+ private final long minPauseBetweenCheckpoints;
+
+ @JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT)
+ private final long maxConcurrentCheckpoints;
+
+ @JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG)
+ private final ExternalizedCheckpointInfo externalizedCheckpointInfo;
+
+ @JsonCreator
+ public CheckpointConfigInfo(
+ @JsonProperty(FIELD_NAME_PROCESSING_MODE) ProcessingMode processingMode,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL) long checkpointInterval,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT) long checkpointTimeout,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE) long minPauseBetweenCheckpoints,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT) int maxConcurrentCheckpoints,
+ @JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG) ExternalizedCheckpointInfo externalizedCheckpointInfo) {
+ this.processingMode = Preconditions.checkNotNull(processingMode);
+ this.checkpointInterval = checkpointInterval;
+ this.checkpointTimeout = checkpointTimeout;
+ this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+ this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+ this.externalizedCheckpointInfo = Preconditions.checkNotNull(externalizedCheckpointInfo);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CheckpointConfigInfo that = (CheckpointConfigInfo) o;
+ return checkpointInterval == that.checkpointInterval &&
+ checkpointTimeout == that.checkpointTimeout &&
+ minPauseBetweenCheckpoints == that.minPauseBetweenCheckpoints &&
+ maxConcurrentCheckpoints == that.maxConcurrentCheckpoints &&
+ processingMode == that.processingMode &&
+ Objects.equals(externalizedCheckpointInfo, that.externalizedCheckpointInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(processingMode, checkpointInterval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, externalizedCheckpointInfo);
+ }
+
+ /**
+ * Contains information about the externalized checkpoint configuration.
+ */
+ public static final class ExternalizedCheckpointInfo {
+
+ public static final String FIELD_NAME_ENABLED = "enabled";
+
+ public static final String FIELD_NAME_DELETE_ON_CANCELLATION = "delete_on_cancellation";
+
+ @JsonProperty(FIELD_NAME_ENABLED)
+ private final boolean enabled;
+
+ @JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION)
+ private final boolean deleteOnCancellation;
+
+ @JsonCreator
+ public ExternalizedCheckpointInfo(
+ @JsonProperty(FIELD_NAME_ENABLED) boolean enabled,
+ @JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION) boolean deleteOnCancellation) {
+ this.enabled = enabled;
+ this.deleteOnCancellation = deleteOnCancellation;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ExternalizedCheckpointInfo that = (ExternalizedCheckpointInfo) o;
+ return enabled == that.enabled &&
+ deleteOnCancellation == that.deleteOnCancellation;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(enabled, deleteOnCancellation);
+ }
+ }
+
+ /**
+ * Processing mode.
+ */
+ public enum ProcessingMode {
+ AT_LEAST_ONCE,
+ EXACTLY_ONCE
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b41f5a66/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java
new file mode 100644
index 0000000..709c21d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo;
+
+/**
+ * Tests for the {@link CheckpointConfigInfo}.
+ */
+public class CheckpointConfigInfoTest extends RestResponseMarshallingTestBase<CheckpointConfigInfo> {
+ @Override
+ protected Class<CheckpointConfigInfo> getTestResponseClass() {
+ return CheckpointConfigInfo.class;
+ }
+
+ @Override
+ protected CheckpointConfigInfo getTestResponseInstance() {
+ final CheckpointConfigInfo.ExternalizedCheckpointInfo externalizedCheckpointInfo = new CheckpointConfigInfo.ExternalizedCheckpointInfo(true, false);
+
+ return new CheckpointConfigInfo(
+ CheckpointConfigInfo.ProcessingMode.AT_LEAST_ONCE,
+ 1L,
+ 2L,
+ 3L,
+ 4,
+ externalizedCheckpointInfo);
+
+ }
+}