You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/09/03 08:33:51 UTC

[flink] branch master updated (b01aff3 -> 14ae6e5)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from b01aff3  [FLINK-10115][rest] Ignore content-length limit for FileUploads
     new 79c3412  [hotfix][rest] Update error handling in FileServerHandlers
     new 14ae6e5  [FLINK-7551][rest] Add versioning

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 ...est_dispatcher.html => rest_v1_dispatcher.html} |   0
 docs/monitoring/rest_api.md                        |  51 +++--
 .../flink/docs/rest/RestAPIDocGenerator.java       |  18 +-
 .../HistoryServerStaticFileServerHandler.java      |  45 +++-
 .../HistoryServerStaticFileServerHandlerTest.java  |   8 +-
 .../org/apache/flink/runtime/rest/RestClient.java  |  32 ++-
 .../flink/runtime/rest/RestServerEndpoint.java     |  44 +++-
 .../rest/handler/RestHandlerSpecification.java     |  13 ++
 .../legacy/files/StaticFileServerHandler.java      |  61 +++--
 .../runtime/rest/versioning/RestAPIVersion.java    |  98 ++++++++
 .../apache/flink/runtime/rest/RestClientTest.java  |  28 ++-
 .../runtime/rest/RestServerEndpointITCase.java     | 251 +++++++++++++++++++++
 .../RestAPIVersionTest.java}                       |  35 +--
 13 files changed, 606 insertions(+), 78 deletions(-)
 rename docs/_includes/generated/{rest_dispatcher.html => rest_v1_dispatcher.html} (100%)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java
 copy flink-runtime/src/test/java/org/apache/flink/runtime/rest/{handler/legacy/JobStoppingHandlerTest.java => versioning/RestAPIVersionTest.java} (53%)


[flink] 01/02: [hotfix][rest] Update error handling in FileServerHandlers

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 79c341209b90a3cf35f7026d67aa6ddbe02c2c40
Author: zentol <ch...@apache.org>
AuthorDate: Wed Aug 22 11:51:01 2018 +0200

    [hotfix][rest] Update error handling in FileServerHandlers
---
 .../HistoryServerStaticFileServerHandler.java      | 45 +++++++++++++---
 .../legacy/files/StaticFileServerHandler.java      | 61 ++++++++++++++--------
 2 files changed, 77 insertions(+), 29 deletions(-)

diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
index b0c2102..2042088 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
@@ -28,6 +28,8 @@ package org.apache.flink.runtime.webmonitor.history;
 
 import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
@@ -57,6 +59,7 @@ import java.net.URL;
 import java.nio.file.Files;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.Collections;
 import java.util.Date;
 import java.util.Locale;
 
@@ -158,7 +161,12 @@ public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHa
 				} finally {
 					if (!success) {
 						LOG.debug("Unable to load requested file {} from classloader", pathToLoad);
-						StaticFileServerHandler.sendError(ctx, NOT_FOUND);
+						HandlerUtils.sendErrorResponse(
+							ctx,
+							request,
+							new ErrorResponseBody(String.format("Unable to load requested file %s.", requestPath)),
+							NOT_FOUND,
+							Collections.emptyMap());
 						return;
 					}
 				}
@@ -166,12 +174,22 @@ public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHa
 		}
 
 		if (!file.exists() || file.isHidden() || file.isDirectory() || !file.isFile()) {
-			StaticFileServerHandler.sendError(ctx, NOT_FOUND);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				request,
+				new ErrorResponseBody("File not found."),
+				NOT_FOUND,
+				Collections.emptyMap());
 			return;
 		}
 
 		if (!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) {
-			StaticFileServerHandler.sendError(ctx, NOT_FOUND);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				request,
+				new ErrorResponseBody("File not found."),
+				NOT_FOUND,
+				Collections.emptyMap());
 			return;
 		}
 
@@ -204,7 +222,12 @@ public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHa
 		try {
 			raf = new RandomAccessFile(file, "r");
 		} catch (FileNotFoundException e) {
-			StaticFileServerHandler.sendError(ctx, NOT_FOUND);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				request,
+				new ErrorResponseBody("File not found."),
+				NOT_FOUND,
+				Collections.emptyMap());
 			return;
 		}
 
@@ -244,7 +267,12 @@ public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHa
 		} catch (Exception e) {
 			raf.close();
 			LOG.error("Failed to serve file.", e);
-			StaticFileServerHandler.sendError(ctx, INTERNAL_SERVER_ERROR);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				request,
+				new ErrorResponseBody("Internal server error."),
+				INTERNAL_SERVER_ERROR,
+				Collections.emptyMap());
 		}
 	}
 
@@ -252,7 +280,12 @@ public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHa
 	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
 		if (ctx.channel().isActive()) {
 			LOG.error("Caught exception", cause);
-			StaticFileServerHandler.sendError(ctx, INTERNAL_SERVER_ERROR);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				false,
+				new ErrorResponseBody("Internal server error."),
+				INTERNAL_SERVER_ERROR,
+				Collections.emptyMap());
 		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
index 62b94e5..969945f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
@@ -29,11 +29,12 @@ package org.apache.flink.runtime.rest.handler.legacy.files;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.rest.handler.RedirectHandler;
 import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.handler.util.MimeTypes;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
@@ -50,7 +51,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
-import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -184,7 +184,12 @@ public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectH
 				} finally {
 					if (!success) {
 						logger.debug("Unable to load requested file {} from classloader", requestPath);
-						sendError(ctx, NOT_FOUND);
+						HandlerUtils.sendErrorResponse(
+							ctx,
+							request,
+							new ErrorResponseBody(String.format("Unable to load requested file %s.", requestPath)),
+							NOT_FOUND,
+							responseHeaders);
 						return;
 					}
 				}
@@ -192,12 +197,22 @@ public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectH
 		}
 
 		if (!file.exists() || file.isHidden() || file.isDirectory() || !file.isFile()) {
-			sendError(ctx, NOT_FOUND);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				request,
+				new ErrorResponseBody("File not found."),
+				NOT_FOUND,
+				responseHeaders);
 			return;
 		}
 
 		if (!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) {
-			sendError(ctx, NOT_FOUND);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				request,
+				new ErrorResponseBody("File not found."),
+				NOT_FOUND,
+				responseHeaders);
 			return;
 		}
 
@@ -231,7 +246,12 @@ public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectH
 			raf = new RandomAccessFile(file, "r");
 		}
 		catch (FileNotFoundException e) {
-			sendError(ctx, NOT_FOUND);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				request,
+				new ErrorResponseBody("File not found."),
+				HttpResponseStatus.NOT_FOUND,
+				responseHeaders);
 			return;
 		}
 
@@ -271,7 +291,12 @@ public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectH
 		} catch (Exception e) {
 			raf.close();
 			logger.error("Failed to serve file.", e);
-			sendError(ctx, INTERNAL_SERVER_ERROR);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				request,
+				new ErrorResponseBody("Internal server error."),
+				INTERNAL_SERVER_ERROR,
+				responseHeaders);
 		}
 	}
 
@@ -279,7 +304,12 @@ public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectH
 	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
 		if (ctx.channel().isActive()) {
 			logger.error("Caught exception", cause);
-			sendError(ctx, INTERNAL_SERVER_ERROR);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				false,
+				new ErrorResponseBody("Internal server error."),
+				INTERNAL_SERVER_ERROR,
+				Collections.emptyMap());
 		}
 	}
 
@@ -288,21 +318,6 @@ public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectH
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Writes a simple  error response message.
-	 *
-	 * @param ctx    The channel context to write the response to.
-	 * @param status The response status.
-	 */
-	public static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
-		FullHttpResponse response = new DefaultFullHttpResponse(
-				HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
-		response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
-
-		// close the connection as soon as the error message is sent.
-		ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
-	}
-
-	/**
 	 * Send the "304 Not Modified" response. This response can be used when the
 	 * file timestamp is the same as what the browser is sending up.
 	 *


[flink] 02/02: [FLINK-7551][rest] Add versioning

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 14ae6e5df54d0855e414270c5c107839d0ee1c76
Author: zentol <ch...@apache.org>
AuthorDate: Wed Aug 22 12:04:29 2018 +0200

    [FLINK-7551][rest] Add versioning
---
 ...est_dispatcher.html => rest_v1_dispatcher.html} |   0
 docs/monitoring/rest_api.md                        |  51 +++--
 .../flink/docs/rest/RestAPIDocGenerator.java       |  18 +-
 .../HistoryServerStaticFileServerHandler.java      |   2 +-
 .../HistoryServerStaticFileServerHandlerTest.java  |   8 +-
 .../org/apache/flink/runtime/rest/RestClient.java  |  32 ++-
 .../flink/runtime/rest/RestServerEndpoint.java     |  44 +++-
 .../rest/handler/RestHandlerSpecification.java     |  13 ++
 .../runtime/rest/versioning/RestAPIVersion.java    |  98 ++++++++
 .../apache/flink/runtime/rest/RestClientTest.java  |  28 ++-
 .../runtime/rest/RestServerEndpointITCase.java     | 251 +++++++++++++++++++++
 .../rest/versioning/RestAPIVersionTest.java        |  52 +++++
 12 files changed, 561 insertions(+), 36 deletions(-)

diff --git a/docs/_includes/generated/rest_dispatcher.html b/docs/_includes/generated/rest_v1_dispatcher.html
similarity index 100%
rename from docs/_includes/generated/rest_dispatcher.html
rename to docs/_includes/generated/rest_v1_dispatcher.html
diff --git a/docs/monitoring/rest_api.md b/docs/monitoring/rest_api.md
index eefc8b9..85ec0ac 100644
--- a/docs/monitoring/rest_api.md
+++ b/docs/monitoring/rest_api.md
@@ -52,13 +52,26 @@ To add new requests, one needs to
 A good example is the `org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler` that uses the `org.apache.flink.runtime.rest.messages.JobExceptionsHeaders`.
 
 
-## Available Requests
+## API
 
-### Dispatcher
+The REST API is versioned, with specific versions being queryable by prefixing the url with the version prefix. Prefixes are always of the form `v[version_number]`.
+For example, to access version 1 of `/foo/bar` one would query `/v1/foo/bar`.
 
-{% include generated/rest_dispatcher.html %}
+If no version is specified Flink will default to the *oldest* version supporting the request.
 
-## Legacy
+Querying unsupported/non-existing versions will return a 404 error.
+
+<span class="label label-danger">Attention</span> REST API versioning is *not* active if the cluster runs in [legacy mode](../ops/config.html#mode). For this case please refer to the legacy API below.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="v1" markdown="1">
+#### Dispatcher
+
+{% include generated/rest_v1_dispatcher.html %}
+</div>
+
+<div data-lang="legacy" markdown="1">
 
 This section is only relevant if the cluster runs in [legacy mode](../ops/config.html#mode).
 
@@ -90,7 +103,7 @@ Values in angle brackets are variables, for example `http://hostname:8081/jobs/<
   - `/jars/:jarid/run`
 
 
-### General
+#### General
 
 **`/config`**
 
@@ -126,7 +139,7 @@ Sample Result:
 }
 {% endhighlight %}
 
-### Overview of Jobs
+#### Overview of Jobs
 
 **`/jobs/overview`**
 
@@ -163,7 +176,7 @@ Sample Result:
 }
 {% endhighlight %}
 
-### Details of a Running or Completed Job
+#### Details of a Running or Completed Job
 
 **`/jobs/<jobid>`**
 
@@ -573,15 +586,15 @@ Sample Result:
 }
 {% endhighlight %}
 
-### Job Cancellation
+#### Job Cancellation
 
-#### Cancel Job
+##### Cancel Job
 
 `DELETE` request to **`/jobs/:jobid/cancel`**.
 
 Triggers job cancellation, result on success is `{}`.
 
-#### Cancel Job with Savepoint
+##### Cancel Job with Savepoint
 
 Triggers a savepoint and cancels the job after the savepoint succeeds.
 
@@ -601,7 +614,7 @@ Sample Trigger Result:
 }
 {% endhighlight %}
 
-##### Monitoring Progress
+###### Monitoring Progress
 
 The progress of the cancellation has to be monitored by the user at
 
@@ -611,7 +624,7 @@ The progress of the cancellation has to be monitored by the user at
 
 The request ID is returned by the trigger result.
 
-###### In-Progress
+####### In-Progress
 
 {% highlight json %}
 {
@@ -620,7 +633,7 @@ The request ID is returned by the trigger result.
 }
 {% endhighlight %}
 
-###### Success
+####### Success
 
 {% highlight json %}
 {
@@ -632,7 +645,7 @@ The request ID is returned by the trigger result.
 
 The `savepointPath` points to the external path of the savepoint, which can be used to resume the savepoint.
 
-###### Failed
+####### Failed
 
 {% highlight json %}
 {
@@ -642,11 +655,11 @@ The `savepointPath` points to the external path of the savepoint, which can be u
 }
 {% endhighlight %}
 
-### Submitting Programs
+#### Submitting Programs
 
 It is possible to upload, run, and list Flink programs via the REST APIs and web frontend.
 
-#### Upload a new JAR file
+##### Upload a new JAR file
 
 Send a `POST` request to `/jars/upload` with your jar file sent as multi-part data under the `jarfile` file.
 Also make sure that the multi-part data includes the `Content-Type` of the file itself, some http libraries do not add the header by default.
@@ -659,7 +672,7 @@ Content-Disposition: form-data; name="jarfile"; filename="YourFileName.jar"
 Content-Type: application/x-java-archive
 {% endhighlight %}
 
-#### Run a Program (POST)
+##### Run a Program (POST)
 
 Send a `POST` request to `/jars/:jarid/run`. The `jarid` parameter is the file name of the program JAR in the configured web frontend upload directory (configuration key `web.upload.dir`).
 
@@ -688,3 +701,7 @@ Response:
 {% endhighlight %}
 
 {% top %}
+</div>
+
+</div>
+
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 82fdeec..4df1d6e 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
@@ -123,13 +124,24 @@ public class RestAPIDocGenerator {
 	public static void main(String[] args) throws IOException {
 		String outputDirectory = args[0];
 
-		createHtmlFile(new DocumentingDispatcherRestEndpoint(), Paths.get(outputDirectory, "rest_dispatcher.html"));
+		for (final RestAPIVersion apiVersion : RestAPIVersion.values()) {
+			if (apiVersion == RestAPIVersion.V0) {
+				// this version exists only for testing purposes
+				continue;
+			}
+			createHtmlFile(
+				new DocumentingDispatcherRestEndpoint(),
+				apiVersion,
+				Paths.get(outputDirectory, "rest_" + apiVersion.getURLVersionPrefix() + "_dispatcher.html"));
+		}
 	}
 
-	private static void createHtmlFile(DocumentingRestEndpoint restEndpoint, Path outputFile) throws IOException {
+	private static void createHtmlFile(DocumentingRestEndpoint restEndpoint, RestAPIVersion apiVersion, Path outputFile) throws IOException {
 		StringBuilder html = new StringBuilder();
 
-		List<MessageHeaders> specs = restEndpoint.getSpecs();
+		List<MessageHeaders> specs = restEndpoint.getSpecs().stream()
+			.filter(spec -> spec.getSupportedAPIVersions().contains(apiVersion))
+			.collect(Collectors.toList());
 		specs.forEach(spec -> html.append(createHtmlEntry(spec)));
 
 		Files.deleteIfExists(outputFile);
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
index 2042088..d8542d6 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
@@ -164,7 +164,7 @@ public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHa
 						HandlerUtils.sendErrorResponse(
 							ctx,
 							request,
-							new ErrorResponseBody(String.format("Unable to load requested file %s.", requestPath)),
+							new ErrorResponseBody("File not found."),
 							NOT_FOUND,
 							Collections.emptyMap());
 						return;
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
index b08504d..19a3d52 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
@@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 
+import static org.hamcrest.CoreMatchers.containsString;
+
 /**
  * Tests for the HistoryServerStaticFileServerHandler.
  */
@@ -56,7 +58,7 @@ public class HistoryServerStaticFileServerHandlerTest {
 		try {
 			// verify that 404 message is returned when requesting a non-existent file
 			String notFound404 = HistoryServerTest.getFromHTTP("http://localhost:" + port + "/hello");
-			Assert.assertTrue(notFound404.contains("404 Not Found"));
+			Assert.assertThat(notFound404, containsString("not found"));
 
 			// verify that a) a file can be loaded using the ClassLoader and b) that the HistoryServer
 			// index_hs.html is injected
@@ -71,12 +73,12 @@ public class HistoryServerStaticFileServerHandlerTest {
 			File dir = new File(webDir, "dir.json");
 			dir.mkdirs();
 			String dirNotFound404 = HistoryServerTest.getFromHTTP("http://localhost:" + port + "/dir");
-			Assert.assertTrue(dirNotFound404.contains("404 Not Found"));
+			Assert.assertTrue(dirNotFound404.contains("not found"));
 
 			// verify that a 404 message is returned when requesting a file outside the webDir
 			tmp.newFile("secret");
 			String x = HistoryServerTest.getFromHTTP("http://localhost:" + port + "/../secret");
-			Assert.assertTrue(x.contains("404 Not Found"));
+			Assert.assertTrue(x.contains("not found"));
 		} finally {
 			webUI.shutdown();
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 2e9de4c..a855749 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -85,6 +86,7 @@ import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE;
 
@@ -173,6 +175,24 @@ public class RestClient {
 			U messageParameters,
 			R request,
 			Collection<FileUpload> fileUploads) throws IOException {
+		return sendRequest(
+			targetAddress,
+			targetPort,
+			messageHeaders,
+			messageParameters,
+			request,
+			fileUploads,
+			RestAPIVersion.getLatestVersion(messageHeaders.getSupportedAPIVersions()));
+	}
+
+	public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
+			String targetAddress,
+			int targetPort,
+			M messageHeaders,
+			U messageParameters,
+			R request,
+			Collection<FileUpload> fileUploads,
+			RestAPIVersion apiVersion) throws IOException {
 		Preconditions.checkNotNull(targetAddress);
 		Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536].");
 		Preconditions.checkNotNull(messageHeaders);
@@ -181,7 +201,17 @@ public class RestClient {
 		Preconditions.checkNotNull(fileUploads);
 		Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved.");
 
-		String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
+		if (!messageHeaders.getSupportedAPIVersions().contains(apiVersion)) {
+			throw new IllegalArgumentException(String.format(
+				"The requested version %s is not supported by the request (method=%s URL=%s). Supported versions are: %s.",
+				apiVersion,
+				messageHeaders.getHttpMethod(),
+				messageHeaders.getTargetRestEndpointURL(),
+				messageHeaders.getSupportedAPIVersions().stream().map(RestAPIVersion::getURLVersionPrefix).collect(Collectors.joining(","))));
+		}
+
+		String versionedHandlerURL = "/" + apiVersion.getURLVersionPrefix() + messageHeaders.getTargetRestEndpointURL();
+		String targetUrl = MessageParameters.resolveUrl(versionedHandlerURL, messageParameters);
 
 		LOG.debug("Sending request of class {} to {}:{}{}", request.getClass(), targetAddress, targetPort, targetUrl);
 		// serialize payload
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index e836e35..28af072 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.rest.handler.router.RouterHandler;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.Preconditions;
@@ -144,8 +145,7 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
 				RestHandlerUrlComparator.INSTANCE);
 
 			handlers.forEach(handler -> {
-				log.debug("Register handler {} under {}@{}.", handler.f1, handler.f0.getHttpMethod(), handler.f0.getTargetRestEndpointURL());
-				registerHandler(router, handler);
+				registerHandler(router, handler, log);
 			});
 
 			ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@@ -364,22 +364,37 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
 		}
 	}
 
-	private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler) {
-		switch (specificationHandler.f0.getHttpMethod()) {
+	private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler, Logger log) {
+		final String handlerURL = specificationHandler.f0.getTargetRestEndpointURL();
+		// setup versioned urls
+		for (final RestAPIVersion supportedVersion : specificationHandler.f0.getSupportedAPIVersions()) {
+			final String versionedHandlerURL = '/' + supportedVersion.getURLVersionPrefix() + handlerURL;
+			log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), versionedHandlerURL);
+			registerHandler(router, versionedHandlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1);
+			if (supportedVersion.isDefaultVersion()) {
+				// setup unversioned url for convenience and backwards compatibility
+				log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), handlerURL);
+				registerHandler(router, handlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1);
+			}
+		}
+	}
+
+	private static void registerHandler(Router router, String handlerURL, HttpMethodWrapper httpMethod, ChannelInboundHandler handler) {
+		switch (httpMethod) {
 			case GET:
-				router.addGet(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+				router.addGet(handlerURL, handler);
 				break;
 			case POST:
-				router.addPost(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+				router.addPost(handlerURL, handler);
 				break;
 			case DELETE:
-				router.addDelete(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+				router.addDelete(handlerURL, handler);
 				break;
 			case PATCH:
-				router.addPatch(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+				router.addPatch(handlerURL, handler);
 				break;
 			default:
-				throw new RuntimeException("Unsupported http method: " + specificationHandler.f0.getHttpMethod() + '.');
+				throw new RuntimeException("Unsupported http method: " + httpMethod + '.');
 		}
 	}
 
@@ -437,13 +452,22 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
 
 		private static final Comparator<String> CASE_INSENSITIVE_ORDER = new CaseInsensitiveOrderComparator();
 
+		private static final Comparator<RestAPIVersion> API_VERSION_ORDER = new RestAPIVersion.RestAPIVersionComparator();
+
 		static final RestHandlerUrlComparator INSTANCE = new RestHandlerUrlComparator();
 
 		@Override
 		public int compare(
 				Tuple2<RestHandlerSpecification, ChannelInboundHandler> o1,
 				Tuple2<RestHandlerSpecification, ChannelInboundHandler> o2) {
-			return CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), o2.f0.getTargetRestEndpointURL());
+			final int urlComparisonResult = CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), o2.f0.getTargetRestEndpointURL());
+			if (urlComparisonResult != 0) {
+				return urlComparisonResult;
+			} else {
+				return API_VERSION_ORDER.compare(
+					Collections.min(o1.f0.getSupportedAPIVersions()),
+					Collections.min(o2.f0.getSupportedAPIVersions()));
+			}
 		}
 
 		/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
index 4ebcd49..6561679 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
@@ -19,6 +19,10 @@
 package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+
+import java.util.Collection;
+import java.util.Collections;
 
 /**
  * Rest handler interface which all rest handler implementation have to implement.
@@ -38,4 +42,13 @@ public interface RestHandlerSpecification {
 	 * @return endpoint url that this request should be sent to
 	 */
 	String getTargetRestEndpointURL();
+
+	/**
+	 * Returns the supported API versions that this request supports.
+	 *
+	 * @return Collection of supported API versions
+	 */
+	default Collection<RestAPIVersion> getSupportedAPIVersions() {
+		return Collections.singleton(RestAPIVersion.V1);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java
new file mode 100644
index 0000000..d630563
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.versioning;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+
+/**
+ * An enum for all versions of the REST API.
+ *
+ * <p>REST API versions are global and thus apply to every REST component.
+ *
+ * <p>Changes that must result in an API version increment include but are not limited to:
+ * - modification of a handler url
+ * - addition of new mandatory parameters
+ * - removal of a handler/request
+ * - modifications to request/response bodies (excluding additions)
+ */
+public enum RestAPIVersion {
+	V0(0, false), // strictly for testing purposes
+	V1(1, true);
+
+	private final int versionNumber;
+
+	private final boolean isDefaultVersion;
+
+	RestAPIVersion(int versionNumber, boolean isDefaultVersion) {
+		this.versionNumber = versionNumber;
+		this.isDefaultVersion = isDefaultVersion;
+	}
+
+	/**
+	 * Returns the URL version prefix (e.g. "v1") for this version.
+	 *
+	 * @return URL version prefix
+	 */
+	public String getURLVersionPrefix() {
+		return name().toLowerCase();
+	}
+
+	/**
+	 * Returns whether this version is the default REST API version.
+	 *
+	 * @return whether this version is the default
+	 */
+	public boolean isDefaultVersion() {
+		return isDefaultVersion;
+	}
+
+	/**
+	 * Converts the given URL version prefix (e.g "v1") to a {@link RestAPIVersion}.
+	 *
+	 * @param prefix prefix to converted
+	 * @return REST API version matching the prefix
+	 * @throws IllegalArgumentException if the prefix doesn't match any version
+	 */
+	public static RestAPIVersion fromURLVersionPrefix(String prefix) {
+		return valueOf(prefix.toUpperCase());
+	}
+
+	/**
+	 * Returns the latest version from the given collection.
+	 *
+	 * @param versions possible candidates
+	 * @return latest version
+	 */
+	public static RestAPIVersion getLatestVersion(Collection<RestAPIVersion> versions) {
+		return Collections.max(versions, new RestAPIVersionComparator());
+	}
+
+	/**
+	 * Comparator for {@link RestAPIVersion} that sorts versions based on their version number, i.e. oldest to latest.
+	 */
+	public static class RestAPIVersionComparator implements Comparator<RestAPIVersion> {
+
+		@Override
+		public int compare(RestAPIVersion o1, RestAPIVersion o2) {
+			return Integer.compare(o1.versionNumber, o2.versionNumber);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index 209f2d1..22cd6f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -25,14 +25,18 @@ import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -45,12 +49,13 @@ import static org.junit.Assert.assertThat;
  */
 public class RestClientTest extends TestLogger {
 
+	private static final String unroutableIp = "10.255.255.1";
+
 	@Test
 	public void testConnectionTimeout() throws Exception {
 		final Configuration config = new Configuration();
 		config.setLong(RestOptions.CONNECTION_TIMEOUT, 1);
 		final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), Executors.directExecutor());
-		final String unroutableIp = "10.255.255.1";
 		try {
 			restClient.sendRequest(
 				unroutableIp,
@@ -66,6 +71,27 @@ public class RestClientTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testInvalidVersionRejection() throws Exception {
+		final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), Executors.directExecutor());
+
+		try {
+			CompletableFuture<EmptyResponseBody> invalidVersionResponse = restClient.sendRequest(
+				unroutableIp,
+				80,
+				new TestMessageHeaders(),
+				EmptyMessageParameters.getInstance(),
+				EmptyRequestBody.getInstance(),
+				Collections.emptyList(),
+				RestAPIVersion.V0
+			);
+			Assert.fail("The request should have been rejected due to a version mismatch.");
+		} catch (IllegalArgumentException e) {
+			// expected
+		}
+
+	}
+
 	private static class TestMessageHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 
 		@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 31f78e3..b017610 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -56,8 +57,12 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
 import org.apache.commons.io.IOUtils;
 import org.junit.After;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -85,6 +90,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
@@ -184,6 +190,21 @@ public class RestServerEndpointITCase extends TestLogger {
 			mockGatewayRetriever,
 			RpcUtils.INF_TIMEOUT);
 
+		TestVersionHandler testVersionHandler = new TestVersionHandler(
+			CompletableFuture.completedFuture(restAddress),
+			mockGatewayRetriever,
+			RpcUtils.INF_TIMEOUT);
+
+		TestVersionSelectionHandler1 testVersionSelectionHandler1 = new TestVersionSelectionHandler1(
+			CompletableFuture.completedFuture(restAddress),
+			mockGatewayRetriever,
+			RpcUtils.INF_TIMEOUT);
+
+		TestVersionSelectionHandler2 testVersionSelectionHandler2 = new TestVersionSelectionHandler2(
+			CompletableFuture.completedFuture(restAddress),
+			mockGatewayRetriever,
+			RpcUtils.INF_TIMEOUT);
+
 		testUploadHandler = new TestUploadHandler(
 			CompletableFuture.completedFuture(restAddress),
 			mockGatewayRetriever,
@@ -198,6 +219,9 @@ public class RestServerEndpointITCase extends TestLogger {
 		final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList(
 			Tuple2.of(new TestHeaders(), testHandler),
 			Tuple2.of(TestUploadHeaders.INSTANCE, testUploadHandler),
+			Tuple2.of(testVersionHandler.getMessageHeaders(), testVersionHandler),
+			Tuple2.of(testVersionSelectionHandler1.getMessageHeaders(), testVersionSelectionHandler1),
+			Tuple2.of(testVersionSelectionHandler2.getMessageHeaders(), testVersionSelectionHandler2),
 			Tuple2.of(WebContentHandlerSpecification.getInstance(), staticFileServerHandler));
 
 		serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers);
@@ -415,6 +439,88 @@ public class RestServerEndpointITCase extends TestLogger {
 		assertEquals("foobar", fileContents.trim());
 	}
 
+	@Test
+	public void testVersioning() throws Exception {
+		CompletableFuture<EmptyResponseBody> unspecifiedVersionResponse = restClient.sendRequest(
+			serverAddress.getHostName(),
+			serverAddress.getPort(),
+			TestVersionHeaders.INSTANCE,
+			EmptyMessageParameters.getInstance(),
+			EmptyRequestBody.getInstance(),
+			Collections.emptyList()
+		);
+
+		unspecifiedVersionResponse.get(5, TimeUnit.SECONDS);
+
+		CompletableFuture<EmptyResponseBody> specifiedVersionResponse = restClient.sendRequest(
+			serverAddress.getHostName(),
+			serverAddress.getPort(),
+			TestVersionHeaders.INSTANCE,
+			EmptyMessageParameters.getInstance(),
+			EmptyRequestBody.getInstance(),
+			Collections.emptyList(),
+			RestAPIVersion.V1
+		);
+
+		specifiedVersionResponse.get(5, TimeUnit.SECONDS);
+	}
+
+	@Test
+	public void testVersionSelection() throws Exception {
+		CompletableFuture<EmptyResponseBody> version1Response = restClient.sendRequest(
+			serverAddress.getHostName(),
+			serverAddress.getPort(),
+			TestVersionSelectionHeaders1.INSTANCE,
+			EmptyMessageParameters.getInstance(),
+			EmptyRequestBody.getInstance(),
+			Collections.emptyList(),
+			RestAPIVersion.V0
+		);
+
+		try {
+			version1Response.get(5, TimeUnit.SECONDS);
+			fail();
+		} catch (ExecutionException ee) {
+			RestClientException rce = (RestClientException) ee.getCause();
+			assertEquals(HttpResponseStatus.OK, rce.getHttpResponseStatus());
+		}
+
+		CompletableFuture<EmptyResponseBody> version2Response = restClient.sendRequest(
+			serverAddress.getHostName(),
+			serverAddress.getPort(),
+			TestVersionSelectionHeaders2.INSTANCE,
+			EmptyMessageParameters.getInstance(),
+			EmptyRequestBody.getInstance(),
+			Collections.emptyList(),
+			RestAPIVersion.V1
+		);
+
+		try {
+			version2Response.get(5, TimeUnit.SECONDS);
+			fail();
+		} catch (ExecutionException ee) {
+			RestClientException rce = (RestClientException) ee.getCause();
+			assertEquals(HttpResponseStatus.ACCEPTED, rce.getHttpResponseStatus());
+		}
+	}
+
+	@Test
+	public void testDefaultVersionRouting() throws Exception {
+		Assume.assumeFalse(
+			"Ignoring SSL-enabled test to keep OkHttp usage simple.",
+			config.getBoolean(SecurityOptions.SSL_REST_ENABLED));
+
+		OkHttpClient client = new OkHttpClient();
+
+		final Request request = new Request.Builder()
+			.url(serverEndpoint.getRestBaseUrl() + TestVersionSelectionHeaders2.INSTANCE.getTargetRestEndpointURL())
+			.build();
+
+		try (final Response response = client.newCall(request).execute()) {
+			assertEquals(HttpResponseStatus.ACCEPTED.code(), response.code());
+		}
+	}
+
 	private HttpURLConnection openHttpConnectionForUpload(final String boundary) throws IOException {
 		final HttpURLConnection connection =
 			(HttpURLConnection) new URL(serverEndpoint.getRestBaseUrl() + "/upload").openConnection();
@@ -697,6 +803,151 @@ public class RestServerEndpointITCase extends TestLogger {
 		}
 	}
 
+	private static class TestVersionHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+		private TestVersionHandler(
+			final CompletableFuture<String> localRestAddress,
+			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			final Time timeout) {
+			super(localRestAddress, leaderRetriever, timeout, Collections.emptyMap(), TestVersionHeaders.INSTANCE);
+		}
+
+		@Override
+		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+			return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+		}
+	}
+
+	private enum TestVersionHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+		INSTANCE;
+
+		@Override
+		public Class<EmptyRequestBody> getRequestClass() {
+			return EmptyRequestBody.class;
+		}
+
+		@Override
+		public HttpMethodWrapper getHttpMethod() {
+			return HttpMethodWrapper.GET;
+		}
+
+		@Override
+		public String getTargetRestEndpointURL() {
+			return "/test/versioning";
+		}
+
+		@Override
+		public Class<EmptyResponseBody> getResponseClass() {
+			return EmptyResponseBody.class;
+		}
+
+		@Override
+		public HttpResponseStatus getResponseStatusCode() {
+			return HttpResponseStatus.OK;
+		}
+
+		@Override
+		public String getDescription() {
+			return null;
+		}
+
+		@Override
+		public EmptyMessageParameters getUnresolvedMessageParameters() {
+			return EmptyMessageParameters.getInstance();
+		}
+
+		@Override
+		public Collection<RestAPIVersion> getSupportedAPIVersions() {
+			return Collections.singleton(RestAPIVersion.V1);
+		}
+	}
+
+	private interface TestVersionSelectionHeadersBase extends MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+		@Override
+		default Class<EmptyRequestBody> getRequestClass() {
+			return EmptyRequestBody.class;
+		}
+
+		@Override
+		default HttpMethodWrapper getHttpMethod() {
+			return HttpMethodWrapper.GET;
+		}
+
+		@Override
+		default String getTargetRestEndpointURL() {
+			return "/test/select-version";
+		}
+
+		@Override
+		default Class<EmptyResponseBody> getResponseClass() {
+			return EmptyResponseBody.class;
+		}
+
+		@Override
+		default HttpResponseStatus getResponseStatusCode() {
+			return HttpResponseStatus.OK;
+		}
+
+		@Override
+		default String getDescription() {
+			return null;
+		}
+
+		@Override
+		default EmptyMessageParameters getUnresolvedMessageParameters() {
+			return EmptyMessageParameters.getInstance();
+		}
+	}
+
+	private enum TestVersionSelectionHeaders1 implements TestVersionSelectionHeadersBase {
+		INSTANCE;
+
+		@Override
+		public Collection<RestAPIVersion> getSupportedAPIVersions() {
+			return Collections.singleton(RestAPIVersion.V0);
+		}
+	}
+
+	private enum TestVersionSelectionHeaders2 implements TestVersionSelectionHeadersBase {
+		INSTANCE;
+
+		@Override
+		public Collection<RestAPIVersion> getSupportedAPIVersions() {
+			return Collections.singleton(RestAPIVersion.V1);
+		}
+	}
+
+	private static class TestVersionSelectionHandler1 extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+		private TestVersionSelectionHandler1(
+			final CompletableFuture<String> localRestAddress,
+			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			final Time timeout) {
+			super(localRestAddress, leaderRetriever, timeout, Collections.emptyMap(), TestVersionSelectionHeaders1.INSTANCE);
+		}
+
+		@Override
+		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+			throw new RestHandlerException("test failure 1", HttpResponseStatus.OK);
+		}
+	}
+
+	private static class TestVersionSelectionHandler2 extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+		private TestVersionSelectionHandler2(
+			final CompletableFuture<String> localRestAddress,
+			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			final Time timeout) {
+			super(localRestAddress, leaderRetriever, timeout, Collections.emptyMap(), TestVersionSelectionHeaders2.INSTANCE);
+		}
+
+		@Override
+		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+			throw new RestHandlerException("test failure 2", HttpResponseStatus.ACCEPTED);
+		}
+	}
+
 	private enum TestUploadHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 		INSTANCE;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java
new file mode 100644
index 0000000..4f60da1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.versioning;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Tests for {@link RestAPIVersion}.
+ */
+public class RestAPIVersionTest extends TestLogger {
+	@Test
+	public void testGetLatest() {
+		Collection<RestAPIVersion> candidates = Arrays.asList(RestAPIVersion.V0, RestAPIVersion.V1);
+		Assert.assertEquals(RestAPIVersion.V1, RestAPIVersion.getLatestVersion(candidates));
+	}
+
+	@Test
+	public void testSingleDefaultVersion() {
+		final List<RestAPIVersion> defaultVersions = Arrays.stream(RestAPIVersion.values())
+			.filter(RestAPIVersion::isDefaultVersion)
+			.collect(Collectors.toList());
+
+		Assert.assertEquals(
+			"Only one RestAPIVersion should be marked as the default. Defaults: " + defaultVersions,
+			1,
+			defaultVersions.size());
+	}
+}