You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/20 15:51:36 UTC

flink git commit: [FLINK-7534] Create LegacyRestHandlerAdapter for old REST handlers

Repository: flink
Updated Branches:
  refs/heads/master 55b76d54f -> dbabdb1cc


[FLINK-7534] Create LegacyRestHandlerAdapter for old REST handlers

Introduce LegacyRestHandler interface which the old REST handler have to implement
in order to make them usable for the RestServerEndpoint in combination with the
LegacyRestHandlerAdapter. The LegacyRestHandlerAdapter extends the AbstractRestHandler
and runs the LegacyRestHandler implementation.

As an example, this commit ports the ClusterOverviewHandler to the new interface. The
Dispatcher side still has to be properly implemented.

This closes #4603.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbabdb1c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbabdb1c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbabdb1c

Branch: refs/heads/master
Commit: dbabdb1cc2c122dbf1e83ffb9960491eaf4914bb
Parents: 55b76d5
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Aug 18 16:18:19 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Sep 20 17:50:27 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  15 +++
 .../runtime/dispatcher/DispatcherGateway.java   |   3 +
 .../dispatcher/DispatcherRestEndpoint.java      |  35 ++++-
 .../entrypoint/SessionClusterEntrypoint.java    |  12 +-
 .../messages/webmonitor/JobsOverview.java       |  25 +++-
 .../messages/webmonitor/StatusOverview.java     |  25 +++-
 .../webmonitor/StatusOverviewWithVersion.java   | 128 +++++++++++++++++++
 .../rest/handler/AbstractRestHandler.java       |   4 +-
 .../runtime/rest/handler/LegacyRestHandler.java |  38 ++++++
 .../rest/handler/LegacyRestHandlerAdapter.java  |  60 +++++++++
 .../runtime/rest/handler/RedirectHandler.java   |  79 +++++++-----
 .../handler/legacy/ClusterOverviewHandler.java  |  38 ++++--
 .../rest/messages/ClusterOverviewHeaders.java   |  72 +++++++++++
 .../rest/messages/EmptyMessageParameters.java   |  46 +++++++
 .../runtime/rest/messages/EmptyRequestBody.java |  25 ++++
 .../StatusOverviewWithVersionTest.java          |  60 +++++++++
 16 files changed, 602 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 521fd8b..6b9999c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
@@ -243,6 +244,20 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		return restAddressFuture;
 	}
 
+	@Override
+	public CompletableFuture<StatusOverview> requestStatusOverview(Time timeout) {
+		// TODO: Implement proper cluster overview generation
+		return CompletableFuture.completedFuture(
+			new StatusOverview(
+				42,
+				1337,
+				1337,
+				5,
+				6,
+				7,
+				8));
+	}
+
 	/**
 	 * Cleans up the job related data from the dispatcher. If cleanupHA is true, then
 	 * the data will also be removed from HA.

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index 398befb..ee5484e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -53,4 +54,6 @@ public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, Restf
 	 */
 	CompletableFuture<Collection<JobID>> listJobs(
 		@RpcTimeout Time timeout);
+
+	CompletableFuture<StatusOverview> requestStatusOverview(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index debd674..1f64c67 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -20,11 +20,16 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
 import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.FileUtils;
@@ -34,10 +39,11 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * REST endpoint for the {@link Dispatcher} component.
@@ -47,20 +53,36 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 	private final GatewayRetriever<DispatcherGateway> leaderRetriever;
 	private final Time timeout;
 	private final File tmpDir;
+	private final Executor executor;
 
 	public DispatcherRestEndpoint(
 			RestServerEndpointConfiguration configuration,
 			GatewayRetriever<DispatcherGateway> leaderRetriever,
 			Time timeout,
-			File tmpDir) {
+			File tmpDir,
+			Executor executor) {
 		super(configuration);
 		this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
 		this.timeout = Preconditions.checkNotNull(timeout);
 		this.tmpDir = Preconditions.checkNotNull(tmpDir);
+		this.executor = Preconditions.checkNotNull(executor);
 	}
 
 	@Override
 	protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
+		ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(2);
+
+		LegacyRestHandlerAdapter<DispatcherGateway, StatusOverviewWithVersion, EmptyMessageParameters> clusterOverviewHandler = new LegacyRestHandlerAdapter<>(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			ClusterOverviewHeaders.getInstance(),
+			new ClusterOverviewHandler(
+				executor,
+				timeout));
+
+		handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler));
+
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
 
 		try {
@@ -74,11 +96,10 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			optWebContent = Optional.empty();
 		}
 
-		return optWebContent
-			.map(webContent ->
-				Collections.singleton(
-					Tuple2.<RestHandlerSpecification, ChannelInboundHandler>of(WebContentHandlerSpecification.getInstance(), webContent)))
-			.orElseGet(() -> Collections.emptySet());
+		optWebContent.ifPresent(
+			webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)));
+
+		return handlers;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 80bd384..9f4e04a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -43,6 +43,7 @@ import org.apache.flink.util.FlinkException;
 
 import java.io.File;
 import java.util.Optional;
+import java.util.concurrent.Executor;
 
 /**
  * Base class for session cluster entry points.
@@ -81,7 +82,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 
 		dispatcherRestEndpoint = createDispatcherRestEndpoint(
 			configuration,
-			dispatcherGatewayRetriever);
+			dispatcherGatewayRetriever,
+			rpcService.getExecutor());
 
 		LOG.debug("Starting Dispatcher REST endpoint.");
 		dispatcherRestEndpoint.start();
@@ -151,8 +153,9 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 	}
 
 	protected DispatcherRestEndpoint createDispatcherRestEndpoint(
-		Configuration configuration,
-		LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever) throws Exception {
+			Configuration configuration,
+			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
+			Executor executor) throws Exception {
 
 		Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
 		File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR));
@@ -161,7 +164,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			RestServerEndpointConfiguration.fromConfiguration(configuration),
 			dispatcherGatewayRetriever,
 			timeout,
-			tmpDir);
+			tmpDir,
+			executor);
 	}
 
 	protected Dispatcher createDispatcher(

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
index 084e97d..9526bc7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
@@ -18,20 +18,39 @@
 
 package org.apache.flink.runtime.messages.webmonitor;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
 /**
  * An overview of how many jobs are in which status.
  */
 public class JobsOverview implements InfoMessage {
 
 	private static final long serialVersionUID = -3699051943490133183L;
-	
+
+	public static final String FIELD_NAME_JOBS_RUNNING = "jobs-running";
+	public static final String FIELD_NAME_JOBS_FINISHED = "jobs-finished";
+	public static final String FIELD_NAME_JOBS_CANCELLED = "jobs-cancelled";
+	public static final String FIELD_NAME_JOBS_FAILED = "jobs-failed";
+
+	@JsonProperty(FIELD_NAME_JOBS_RUNNING)
 	private final int numJobsRunningOrPending;
+
+	@JsonProperty(FIELD_NAME_JOBS_FINISHED)
 	private final int numJobsFinished;
+
+	@JsonProperty(FIELD_NAME_JOBS_CANCELLED)
 	private final int numJobsCancelled;
+
+	@JsonProperty(FIELD_NAME_JOBS_FAILED)
 	private final int numJobsFailed;
 
-	public JobsOverview(int numJobsRunningOrPending, int numJobsFinished,
-						int numJobsCancelled, int numJobsFailed) {
+	@JsonCreator
+	public JobsOverview(
+			@JsonProperty(FIELD_NAME_JOBS_RUNNING) int numJobsRunningOrPending,
+			@JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished,
+			@JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled,
+			@JsonProperty(FIELD_NAME_JOBS_FAILED) int numJobsFailed) {
 		
 		this.numJobsRunningOrPending = numJobsRunningOrPending;
 		this.numJobsFinished = numJobsFinished;

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
index 214141e..2c04b7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.messages.webmonitor;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
 /**
  * Response to the {@link RequestStatusOverview} message, carrying a description
  * of the Flink cluster status.
@@ -25,13 +28,29 @@ package org.apache.flink.runtime.messages.webmonitor;
 public class StatusOverview extends JobsOverview {
 
 	private static final long serialVersionUID = -729861859715105265L;
-	
+
+	public static final String FIELD_NAME_TASKMANAGERS = "taskmanagers";
+	public static final String FIELD_NAME_SLOTS_TOTAL = "slots-total";
+	public static final String FIELD_NAME_SLOTS_AVAILABLE = "slots-available";
+
+	@JsonProperty(FIELD_NAME_TASKMANAGERS)
 	private final int numTaskManagersConnected;
+
+	@JsonProperty(FIELD_NAME_SLOTS_TOTAL)
 	private final int numSlotsTotal;
+
+	@JsonProperty(FIELD_NAME_SLOTS_AVAILABLE)
 	private final int numSlotsAvailable;
 
-	public StatusOverview(int numTaskManagersConnected, int numSlotsTotal, int numSlotsAvailable,
-							int numJobsRunningOrPending, int numJobsFinished, int numJobsCancelled, int numJobsFailed) {
+	@JsonCreator
+	public StatusOverview(
+			@JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
+			@JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
+			@JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
+			@JsonProperty(FIELD_NAME_JOBS_RUNNING) int numJobsRunningOrPending,
+			@JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished,
+			@JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled,
+			@JsonProperty(FIELD_NAME_JOBS_FAILED) int numJobsFailed) {
 
 		super(numJobsRunningOrPending, numJobsFinished, numJobsCancelled, numJobsFailed);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java
new file mode 100644
index 0000000..9029537
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Status overview message including the current Flink version and commit id.
+ */
+public class StatusOverviewWithVersion extends StatusOverview implements ResponseBody {
+
+	private static final long serialVersionUID = 5000058311783413216L;
+
+	public static final String FIELD_NAME_VERSION = "flink-version";
+	public static final String FIELD_NAME_COMMIT = "flink-commit";
+
+	@JsonProperty(FIELD_NAME_VERSION)
+	private final String version;
+
+	@JsonProperty(FIELD_NAME_COMMIT)
+	private final String commitId;
+
+	@JsonCreator
+	public StatusOverviewWithVersion(
+			@JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
+			@JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
+			@JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
+			@JsonProperty(FIELD_NAME_JOBS_RUNNING) int numJobsRunningOrPending,
+			@JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished,
+			@JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled,
+			@JsonProperty(FIELD_NAME_JOBS_FAILED) int numJobsFailed,
+			@JsonProperty(FIELD_NAME_VERSION) String version,
+			@JsonProperty(FIELD_NAME_COMMIT) String commitId) {
+		super(
+			numTaskManagersConnected,
+			numSlotsTotal,
+			numSlotsAvailable,
+			numJobsRunningOrPending,
+			numJobsFinished,
+			numJobsCancelled,
+			numJobsFailed);
+
+		this.version = Preconditions.checkNotNull(version);
+		this.commitId = Preconditions.checkNotNull(commitId);
+	}
+
+	public StatusOverviewWithVersion(
+			int numTaskManagersConnected,
+			int numSlotsTotal,
+			int numSlotsAvailable,
+			JobsOverview jobs1,
+			JobsOverview jobs2,
+			String version,
+			String commitId) {
+		super(numTaskManagersConnected, numSlotsTotal, numSlotsAvailable, jobs1, jobs2);
+
+		this.version = Preconditions.checkNotNull(version);
+		this.commitId = Preconditions.checkNotNull(commitId);
+	}
+
+	public static StatusOverviewWithVersion fromStatusOverview(StatusOverview statusOverview, String version, String commitId) {
+		return new StatusOverviewWithVersion(
+			statusOverview.getNumTaskManagersConnected(),
+			statusOverview.getNumSlotsTotal(),
+			statusOverview.getNumSlotsAvailable(),
+			statusOverview.getNumJobsRunningOrPending(),
+			statusOverview.getNumJobsFinished(),
+			statusOverview.getNumJobsCancelled(),
+			statusOverview.getNumJobsFailed(),
+			version,
+			commitId);
+	}
+
+	public String getVersion() {
+		return version;
+	}
+
+	public String getCommitId() {
+		return commitId;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		if (!super.equals(o)) {
+			return false;
+		}
+
+		StatusOverviewWithVersion that = (StatusOverviewWithVersion) o;
+
+		return Objects.equals(version, that.getVersion()) && Objects.equals(commitId, that.getCommitId());
+	}
+
+	@Override
+	public int hashCode() {
+		int result = super.hashCode();
+		result = 31 * result + (version != null ? version.hashCode() : 0);
+		result = 31 * result + (commitId != null ? commitId.hashCode() : 0);
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 596c947..697c046 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -66,11 +66,11 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 	private final MessageHeaders<R, P, M> messageHeaders;
 
 	protected AbstractRestHandler(
-			CompletableFuture<String> localAddressFuture,
+			CompletableFuture<String> localRestAddress,
 			GatewayRetriever<T> leaderRetriever,
 			Time timeout,
 			MessageHeaders<R, P, M> messageHeaders) {
-		super(localAddressFuture, leaderRetriever, timeout);
+		super(localRestAddress, leaderRetriever, timeout);
 		this.messageHeaders = messageHeaders;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandler.java
new file mode 100644
index 0000000..3e70575
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface which Flink's legacy REST handler have to implement in order to be usable
+ * via the {@link LegacyRestHandlerAdapter}.
+ *
+ * @param <T> type of the gateway
+ * @param <R> type of the REST response
+ */
+public interface LegacyRestHandler<T extends RestfulGateway, R extends ResponseBody, M extends MessageParameters> {
+
+	CompletableFuture<R> handleRequest(HandlerRequest<EmptyRequestBody, M> request, T gateway);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandlerAdapter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandlerAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandlerAdapter.java
new file mode 100644
index 0000000..e9eaff7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandlerAdapter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Adapter for Flink's legacy REST handlers.
+ *
+ * @param <T> type of the gateway
+ * @param <R> type of the REST response
+ * @param <M> type of the MessageParameters
+ */
+public class LegacyRestHandlerAdapter<T extends RestfulGateway, R extends ResponseBody, M extends MessageParameters> extends AbstractRestHandler<T, EmptyRequestBody, R, M> {
+
+	private final LegacyRestHandler<T, R, M> legacyRestHandler;
+
+	public LegacyRestHandlerAdapter(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<T> leaderRetriever,
+			Time timeout,
+			MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
+			LegacyRestHandler<T, R, M> legacyRestHandler) {
+		super(localRestAddress, leaderRetriever, timeout, messageHeaders);
+
+		this.legacyRestHandler = Preconditions.checkNotNull(legacyRestHandler);
+	}
+
+	@Override
+	protected CompletableFuture<R> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, M> request, @Nonnull T gateway) throws RestHandlerException {
+		return legacyRestHandler.handleRequest(request, gateway);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
index 83550cd..dfede98 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
@@ -34,6 +34,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,22 +79,24 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
 		ChannelHandlerContext channelHandlerContext,
 		Routed routed) throws Exception {
 
-		try {
-			if (localAddressFuture.isDone()) {
-				if (localAddress == null) {
-					try {
-						localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-					} catch (Exception e) {
-						logger.error("Could not obtain local address.", e);
+		if (localAddressFuture.isDone()) {
+			if (localAddress == null) {
+				try {
+					localAddress = localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+				} catch (Exception e) {
+					logger.error("Could not obtain local address.", e);
 
-						HandlerUtils.sendErrorResponse(
-							channelHandlerContext,
-							routed.request(),
-							new ErrorResponseBody("Fatal error. Could not obtain local address. Please try to refresh."),
-							HttpResponseStatus.INTERNAL_SERVER_ERROR);
-					}
+					HandlerUtils.sendErrorResponse(
+						channelHandlerContext,
+						routed.request(),
+						new ErrorResponseBody("Fatal error. Could not obtain local address. Please try to refresh."),
+						HttpResponseStatus.INTERNAL_SERVER_ERROR);
+
+					return;
 				}
+			}
 
+			try {
 				OptionalConsumer<T> optLeaderConsumer = OptionalConsumer.of(leaderRetriever.getNow());
 
 				optLeaderConsumer.ifPresent(
@@ -103,34 +106,42 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
 							gateway,
 							timeout);
 
+						// retain the message for the asynchronous handler
+						ReferenceCountUtil.retain(routed);
+
 						optRedirectAddressFuture.whenComplete(
 							(Optional<String> optRedirectAddress, Throwable throwable) -> {
 								HttpResponse response;
-								if (throwable != null) {
-									logger.error("Could not retrieve the redirect address.", throwable);
+								try {
+									if (throwable != null) {
+										logger.error("Could not retrieve the redirect address.", throwable);
 
 									HandlerUtils.sendErrorResponse(
 										channelHandlerContext,
 										routed.request(),
 										new ErrorResponseBody("Could not retrieve the redirect address of the current leader. Please try to refresh."),
 										HttpResponseStatus.INTERNAL_SERVER_ERROR);
-								} else if (optRedirectAddress.isPresent()) {
-									response = HandlerRedirectUtils.getRedirectResponse(
-										optRedirectAddress.get(),
-										routed.path());
-
-									KeepAliveWrite.flush(channelHandlerContext, routed.request(), response);
-								} else {
-									try {
-										respondAsLeader(channelHandlerContext, routed, gateway);
-									} catch (Exception e) {
-										logger.error("Error while responding as leader.", e);
+									} else if (optRedirectAddress.isPresent()) {
+										response = HandlerRedirectUtils.getRedirectResponse(
+											optRedirectAddress.get(),
+											routed.path());
+
+										KeepAliveWrite.flush(channelHandlerContext, routed.request(), response);
+									} else {
+										try {
+											respondAsLeader(channelHandlerContext, routed, gateway);
+										} catch (Exception e) {
+											logger.error("Error while responding as leader.", e);
 										HandlerUtils.sendErrorResponse(
-											channelHandlerContext,
-											routed.request(),
+												channelHandlerContext,
+												routed.request(),
 											new ErrorResponseBody("Error while responding to the request."),
 											HttpResponseStatus.INTERNAL_SERVER_ERROR);
+										}
 									}
+								} finally {
+									// release the message after processing it asynchronously
+									ReferenceCountUtil.release(routed);
 								}
 							}
 						);
@@ -142,19 +153,21 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
 							routed.request(),
 							new ErrorResponseBody("Service temporarily unavailable due to an ongoing leader election. Please refresh."),
 							HttpResponseStatus.SERVICE_UNAVAILABLE));
-			} else {
+
+			} catch (Throwable throwable) {
+				logger.warn("Error occurred while processing web request.", throwable);
+
 				HandlerUtils.sendErrorResponse(
 					channelHandlerContext,
 					routed.request(),
-					new ErrorResponseBody("Local address has not been resolved. This indicates an internal error."),
+					new ErrorResponseBody("Error occurred in RedirectHandler: " + throwable.getMessage() + '.'),
 					HttpResponseStatus.INTERNAL_SERVER_ERROR);
 			}
-		} catch (Throwable throwable) {
-			logger.warn("Error occurred while processing web request.", throwable);
+		} else {
 			HandlerUtils.sendErrorResponse(
 				channelHandlerContext,
 				routed.request(),
-				new ErrorResponseBody("Error occurred in RedirectHandler: " + throwable.getMessage() + '.'),
+				new ErrorResponseBody("Local address has not been resolved. This indicates an internal error."),
 				HttpResponseStatus.INTERNAL_SERVER_ERROR);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
index db13633..9340fa2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
@@ -21,8 +21,15 @@ package org.apache.flink.runtime.rest.handler.legacy;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.FlinkException;
 
@@ -34,15 +41,16 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
+import static org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders.CLUSTER_OVERVIEW_REST_PATH;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Responder that returns the status of the Flink cluster, such as how many
  * TaskManagers are currently connected, and how many jobs are running.
  */
-public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
+public class ClusterOverviewHandler extends AbstractJsonRequestHandler implements LegacyRestHandler<DispatcherGateway, StatusOverviewWithVersion, EmptyMessageParameters> {
+
 
-	private static final String CLUSTER_OVERVIEW_REST_PATH = "/overview";
 
 	private static final String version = EnvironmentInformation.getVersion();
 
@@ -74,16 +82,16 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
 							JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
 							gen.writeStartObject();
-							gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
-							gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
-							gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
-							gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
-							gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
-							gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
-							gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
-							gen.writeStringField("flink-version", version);
+							gen.writeNumberField(StatusOverview.FIELD_NAME_TASKMANAGERS, overview.getNumTaskManagersConnected());
+							gen.writeNumberField(StatusOverview.FIELD_NAME_SLOTS_TOTAL, overview.getNumSlotsTotal());
+							gen.writeNumberField(StatusOverview.FIELD_NAME_SLOTS_AVAILABLE, overview.getNumSlotsAvailable());
+							gen.writeNumberField(JobsOverview.FIELD_NAME_JOBS_RUNNING, overview.getNumJobsRunningOrPending());
+							gen.writeNumberField(JobsOverview.FIELD_NAME_JOBS_FINISHED, overview.getNumJobsFinished());
+							gen.writeNumberField(JobsOverview.FIELD_NAME_JOBS_CANCELLED, overview.getNumJobsCancelled());
+							gen.writeNumberField(JobsOverview.FIELD_NAME_JOBS_FAILED, overview.getNumJobsFailed());
+							gen.writeStringField(StatusOverviewWithVersion.FIELD_NAME_VERSION, version);
 							if (!commitID.equals(EnvironmentInformation.UNKNOWN)) {
-								gen.writeStringField("flink-commit", commitID);
+								gen.writeStringField(StatusOverviewWithVersion.FIELD_NAME_COMMIT, commitID);
 							}
 							gen.writeEndObject();
 
@@ -102,4 +110,12 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
 			return FutureUtils.completedExceptionally(new FlinkException("Failed to fetch list of all running jobs: ", e));
 		}
 	}
+
+	@Override
+	public CompletableFuture<StatusOverviewWithVersion> handleRequest(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, DispatcherGateway gateway) {
+		CompletableFuture<StatusOverview> overviewFuture = gateway.requestStatusOverview(timeout);
+
+		return overviewFuture.thenApply(
+			statusOverview -> StatusOverviewWithVersion.fromStatusOverview(statusOverview, version, commitID));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
new file mode 100644
index 0000000..f0f98ec
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link ClusterOverviewHandler}.
+ */
+public final class ClusterOverviewHeaders implements MessageHeaders<EmptyRequestBody, StatusOverviewWithVersion, EmptyMessageParameters> {
+
+	private static final ClusterOverviewHeaders INSTANCE = new ClusterOverviewHeaders();
+
+	public static final String CLUSTER_OVERVIEW_REST_PATH = "/overview";
+
+	// make this class a singleton
+	private ClusterOverviewHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return CLUSTER_OVERVIEW_REST_PATH;
+	}
+
+	@Override
+	public Class<StatusOverviewWithVersion> getResponseClass() {
+		return StatusOverviewWithVersion.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public EmptyMessageParameters getUnresolvedMessageParameters() {
+		return EmptyMessageParameters.getInstance();
+	}
+
+	public static ClusterOverviewHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyMessageParameters.java
new file mode 100644
index 0000000..82889bd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyMessageParameters.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * MessageParameters implementation which has no parameters.
+ */
+public class EmptyMessageParameters extends MessageParameters {
+
+	private static final EmptyMessageParameters INSTANCE = new EmptyMessageParameters();
+
+	private EmptyMessageParameters() {}
+
+	@Override
+	public Collection<MessagePathParameter<?>> getPathParameters() {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public Collection<MessageQueryParameter<?>> getQueryParameters() {
+		return Collections.emptyList();
+	}
+
+	public static EmptyMessageParameters getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyRequestBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyRequestBody.java
new file mode 100644
index 0000000..603c3d4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyRequestBody.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Request which do not have a request payload.
+ */
+public class EmptyRequestBody implements RequestBody {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java
new file mode 100644
index 0000000..d69049e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link StatusOverviewWithVersion}.
+ */
+public class StatusOverviewWithVersionTest extends TestLogger {
+
+	/**
+	 * Tests that we can marshal and unmarshal StatusOverviewWithVersion.
+	 */
+	@Test
+	public void testJsonMarshalling() throws JsonProcessingException {
+		final StatusOverviewWithVersion expected = new StatusOverviewWithVersion(
+			1,
+			3,
+			3,
+			7,
+			4,
+			2,
+			0,
+			"version",
+			"commit");
+
+		ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+
+		JsonNode json = objectMapper.valueToTree(expected);
+
+		final StatusOverviewWithVersion unmarshalled = objectMapper.treeToValue(json, StatusOverviewWithVersion.class);
+
+		assertEquals(expected, unmarshalled);
+	}
+}