You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/09/03 08:33:53 UTC

[flink] 02/02: [FLINK-7551][rest] Add versioning

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 14ae6e5df54d0855e414270c5c107839d0ee1c76
Author: zentol <ch...@apache.org>
AuthorDate: Wed Aug 22 12:04:29 2018 +0200

    [FLINK-7551][rest] Add versioning
---
 ...est_dispatcher.html => rest_v1_dispatcher.html} |   0
 docs/monitoring/rest_api.md                        |  51 +++--
 .../flink/docs/rest/RestAPIDocGenerator.java       |  18 +-
 .../HistoryServerStaticFileServerHandler.java      |   2 +-
 .../HistoryServerStaticFileServerHandlerTest.java  |   8 +-
 .../org/apache/flink/runtime/rest/RestClient.java  |  32 ++-
 .../flink/runtime/rest/RestServerEndpoint.java     |  44 +++-
 .../rest/handler/RestHandlerSpecification.java     |  13 ++
 .../runtime/rest/versioning/RestAPIVersion.java    |  98 ++++++++
 .../apache/flink/runtime/rest/RestClientTest.java  |  28 ++-
 .../runtime/rest/RestServerEndpointITCase.java     | 251 +++++++++++++++++++++
 .../rest/versioning/RestAPIVersionTest.java        |  52 +++++
 12 files changed, 561 insertions(+), 36 deletions(-)

diff --git a/docs/_includes/generated/rest_dispatcher.html b/docs/_includes/generated/rest_v1_dispatcher.html
similarity index 100%
rename from docs/_includes/generated/rest_dispatcher.html
rename to docs/_includes/generated/rest_v1_dispatcher.html
diff --git a/docs/monitoring/rest_api.md b/docs/monitoring/rest_api.md
index eefc8b9..85ec0ac 100644
--- a/docs/monitoring/rest_api.md
+++ b/docs/monitoring/rest_api.md
@@ -52,13 +52,26 @@ To add new requests, one needs to
 A good example is the `org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler` that uses the `org.apache.flink.runtime.rest.messages.JobExceptionsHeaders`.
 
 
-## Available Requests
+## API
 
-### Dispatcher
+The REST API is versioned, with specific versions being queryable by prefixing the url with the version prefix. Prefixes are always of the form `v[version_number]`.
+For example, to access version 1 of `/foo/bar` one would query `/v1/foo/bar`.
 
-{% include generated/rest_dispatcher.html %}
+If no version is specified Flink will default to the *oldest* version supporting the request.
 
-## Legacy
+Querying unsupported/non-existing versions will return a 404 error.
+
+<span class="label label-danger">Attention</span> REST API versioning is *not* active if the cluster runs in [legacy mode](../ops/config.html#mode). For this case please refer to the legacy API below.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="v1" markdown="1">
+#### Dispatcher
+
+{% include generated/rest_v1_dispatcher.html %}
+</div>
+
+<div data-lang="legacy" markdown="1">
 
 This section is only relevant if the cluster runs in [legacy mode](../ops/config.html#mode).
 
@@ -90,7 +103,7 @@ Values in angle brackets are variables, for example `http://hostname:8081/jobs/<
   - `/jars/:jarid/run`
 
 
-### General
+#### General
 
 **`/config`**
 
@@ -126,7 +139,7 @@ Sample Result:
 }
 {% endhighlight %}
 
-### Overview of Jobs
+#### Overview of Jobs
 
 **`/jobs/overview`**
 
@@ -163,7 +176,7 @@ Sample Result:
 }
 {% endhighlight %}
 
-### Details of a Running or Completed Job
+#### Details of a Running or Completed Job
 
 **`/jobs/<jobid>`**
 
@@ -573,15 +586,15 @@ Sample Result:
 }
 {% endhighlight %}
 
-### Job Cancellation
+#### Job Cancellation
 
-#### Cancel Job
+##### Cancel Job
 
 `DELETE` request to **`/jobs/:jobid/cancel`**.
 
 Triggers job cancellation, result on success is `{}`.
 
-#### Cancel Job with Savepoint
+##### Cancel Job with Savepoint
 
 Triggers a savepoint and cancels the job after the savepoint succeeds.
 
@@ -601,7 +614,7 @@ Sample Trigger Result:
 }
 {% endhighlight %}
 
-##### Monitoring Progress
+###### Monitoring Progress
 
 The progress of the cancellation has to be monitored by the user at
 
@@ -611,7 +624,7 @@ The progress of the cancellation has to be monitored by the user at
 
 The request ID is returned by the trigger result.
 
-###### In-Progress
+####### In-Progress
 
 {% highlight json %}
 {
@@ -620,7 +633,7 @@ The request ID is returned by the trigger result.
 }
 {% endhighlight %}
 
-###### Success
+####### Success
 
 {% highlight json %}
 {
@@ -632,7 +645,7 @@ The request ID is returned by the trigger result.
 
 The `savepointPath` points to the external path of the savepoint, which can be used to resume the savepoint.
 
-###### Failed
+####### Failed
 
 {% highlight json %}
 {
@@ -642,11 +655,11 @@ The `savepointPath` points to the external path of the savepoint, which can be u
 }
 {% endhighlight %}
 
-### Submitting Programs
+#### Submitting Programs
 
 It is possible to upload, run, and list Flink programs via the REST APIs and web frontend.
 
-#### Upload a new JAR file
+##### Upload a new JAR file
 
 Send a `POST` request to `/jars/upload` with your jar file sent as multi-part data under the `jarfile` file.
 Also make sure that the multi-part data includes the `Content-Type` of the file itself, some http libraries do not add the header by default.
@@ -659,7 +672,7 @@ Content-Disposition: form-data; name="jarfile"; filename="YourFileName.jar"
 Content-Type: application/x-java-archive
 {% endhighlight %}
 
-#### Run a Program (POST)
+##### Run a Program (POST)
 
 Send a `POST` request to `/jars/:jarid/run`. The `jarid` parameter is the file name of the program JAR in the configured web frontend upload directory (configuration key `web.upload.dir`).
 
@@ -688,3 +701,7 @@ Response:
 {% endhighlight %}
 
 {% top %}
+</div>
+
+</div>
+
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 82fdeec..4df1d6e 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
@@ -123,13 +124,24 @@ public class RestAPIDocGenerator {
 	public static void main(String[] args) throws IOException {
 		String outputDirectory = args[0];
 
-		createHtmlFile(new DocumentingDispatcherRestEndpoint(), Paths.get(outputDirectory, "rest_dispatcher.html"));
+		for (final RestAPIVersion apiVersion : RestAPIVersion.values()) {
+			if (apiVersion == RestAPIVersion.V0) {
+				// this version exists only for testing purposes
+				continue;
+			}
+			createHtmlFile(
+				new DocumentingDispatcherRestEndpoint(),
+				apiVersion,
+				Paths.get(outputDirectory, "rest_" + apiVersion.getURLVersionPrefix() + "_dispatcher.html"));
+		}
 	}
 
-	private static void createHtmlFile(DocumentingRestEndpoint restEndpoint, Path outputFile) throws IOException {
+	private static void createHtmlFile(DocumentingRestEndpoint restEndpoint, RestAPIVersion apiVersion, Path outputFile) throws IOException {
 		StringBuilder html = new StringBuilder();
 
-		List<MessageHeaders> specs = restEndpoint.getSpecs();
+		List<MessageHeaders> specs = restEndpoint.getSpecs().stream()
+			.filter(spec -> spec.getSupportedAPIVersions().contains(apiVersion))
+			.collect(Collectors.toList());
 		specs.forEach(spec -> html.append(createHtmlEntry(spec)));
 
 		Files.deleteIfExists(outputFile);
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
index 2042088..d8542d6 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
@@ -164,7 +164,7 @@ public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHa
 						HandlerUtils.sendErrorResponse(
 							ctx,
 							request,
-							new ErrorResponseBody(String.format("Unable to load requested file %s.", requestPath)),
+							new ErrorResponseBody("File not found."),
 							NOT_FOUND,
 							Collections.emptyMap());
 						return;
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
index b08504d..19a3d52 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
@@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 
+import static org.hamcrest.CoreMatchers.containsString;
+
 /**
  * Tests for the HistoryServerStaticFileServerHandler.
  */
@@ -56,7 +58,7 @@ public class HistoryServerStaticFileServerHandlerTest {
 		try {
 			// verify that 404 message is returned when requesting a non-existent file
 			String notFound404 = HistoryServerTest.getFromHTTP("http://localhost:" + port + "/hello");
-			Assert.assertTrue(notFound404.contains("404 Not Found"));
+			Assert.assertThat(notFound404, containsString("not found"));
 
 			// verify that a) a file can be loaded using the ClassLoader and b) that the HistoryServer
 			// index_hs.html is injected
@@ -71,12 +73,12 @@ public class HistoryServerStaticFileServerHandlerTest {
 			File dir = new File(webDir, "dir.json");
 			dir.mkdirs();
 			String dirNotFound404 = HistoryServerTest.getFromHTTP("http://localhost:" + port + "/dir");
-			Assert.assertTrue(dirNotFound404.contains("404 Not Found"));
+			Assert.assertTrue(dirNotFound404.contains("not found"));
 
 			// verify that a 404 message is returned when requesting a file outside the webDir
 			tmp.newFile("secret");
 			String x = HistoryServerTest.getFromHTTP("http://localhost:" + port + "/../secret");
-			Assert.assertTrue(x.contains("404 Not Found"));
+			Assert.assertTrue(x.contains("not found"));
 		} finally {
 			webUI.shutdown();
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 2e9de4c..a855749 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -85,6 +86,7 @@ import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE;
 
@@ -173,6 +175,24 @@ public class RestClient {
 			U messageParameters,
 			R request,
 			Collection<FileUpload> fileUploads) throws IOException {
+		return sendRequest(
+			targetAddress,
+			targetPort,
+			messageHeaders,
+			messageParameters,
+			request,
+			fileUploads,
+			RestAPIVersion.getLatestVersion(messageHeaders.getSupportedAPIVersions()));
+	}
+
+	public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
+			String targetAddress,
+			int targetPort,
+			M messageHeaders,
+			U messageParameters,
+			R request,
+			Collection<FileUpload> fileUploads,
+			RestAPIVersion apiVersion) throws IOException {
 		Preconditions.checkNotNull(targetAddress);
 		Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536].");
 		Preconditions.checkNotNull(messageHeaders);
@@ -181,7 +201,17 @@ public class RestClient {
 		Preconditions.checkNotNull(fileUploads);
 		Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved.");
 
-		String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
+		if (!messageHeaders.getSupportedAPIVersions().contains(apiVersion)) {
+			throw new IllegalArgumentException(String.format(
+				"The requested version %s is not supported by the request (method=%s URL=%s). Supported versions are: %s.",
+				apiVersion,
+				messageHeaders.getHttpMethod(),
+				messageHeaders.getTargetRestEndpointURL(),
+				messageHeaders.getSupportedAPIVersions().stream().map(RestAPIVersion::getURLVersionPrefix).collect(Collectors.joining(","))));
+		}
+
+		String versionedHandlerURL = "/" + apiVersion.getURLVersionPrefix() + messageHeaders.getTargetRestEndpointURL();
+		String targetUrl = MessageParameters.resolveUrl(versionedHandlerURL, messageParameters);
 
 		LOG.debug("Sending request of class {} to {}:{}{}", request.getClass(), targetAddress, targetPort, targetUrl);
 		// serialize payload
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index e836e35..28af072 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.rest.handler.router.RouterHandler;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.Preconditions;
@@ -144,8 +145,7 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
 				RestHandlerUrlComparator.INSTANCE);
 
 			handlers.forEach(handler -> {
-				log.debug("Register handler {} under {}@{}.", handler.f1, handler.f0.getHttpMethod(), handler.f0.getTargetRestEndpointURL());
-				registerHandler(router, handler);
+				registerHandler(router, handler, log);
 			});
 
 			ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@@ -364,22 +364,37 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
 		}
 	}
 
-	private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler) {
-		switch (specificationHandler.f0.getHttpMethod()) {
+	private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler, Logger log) {
+		final String handlerURL = specificationHandler.f0.getTargetRestEndpointURL();
+		// setup versioned urls
+		for (final RestAPIVersion supportedVersion : specificationHandler.f0.getSupportedAPIVersions()) {
+			final String versionedHandlerURL = '/' + supportedVersion.getURLVersionPrefix() + handlerURL;
+			log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), versionedHandlerURL);
+			registerHandler(router, versionedHandlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1);
+			if (supportedVersion.isDefaultVersion()) {
+				// setup unversioned url for convenience and backwards compatibility
+				log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), handlerURL);
+				registerHandler(router, handlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1);
+			}
+		}
+	}
+
+	private static void registerHandler(Router router, String handlerURL, HttpMethodWrapper httpMethod, ChannelInboundHandler handler) {
+		switch (httpMethod) {
 			case GET:
-				router.addGet(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+				router.addGet(handlerURL, handler);
 				break;
 			case POST:
-				router.addPost(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+				router.addPost(handlerURL, handler);
 				break;
 			case DELETE:
-				router.addDelete(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+				router.addDelete(handlerURL, handler);
 				break;
 			case PATCH:
-				router.addPatch(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+				router.addPatch(handlerURL, handler);
 				break;
 			default:
-				throw new RuntimeException("Unsupported http method: " + specificationHandler.f0.getHttpMethod() + '.');
+				throw new RuntimeException("Unsupported http method: " + httpMethod + '.');
 		}
 	}
 
@@ -437,13 +452,22 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
 
 		private static final Comparator<String> CASE_INSENSITIVE_ORDER = new CaseInsensitiveOrderComparator();
 
+		private static final Comparator<RestAPIVersion> API_VERSION_ORDER = new RestAPIVersion.RestAPIVersionComparator();
+
 		static final RestHandlerUrlComparator INSTANCE = new RestHandlerUrlComparator();
 
 		@Override
 		public int compare(
 				Tuple2<RestHandlerSpecification, ChannelInboundHandler> o1,
 				Tuple2<RestHandlerSpecification, ChannelInboundHandler> o2) {
-			return CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), o2.f0.getTargetRestEndpointURL());
+			final int urlComparisonResult = CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), o2.f0.getTargetRestEndpointURL());
+			if (urlComparisonResult != 0) {
+				return urlComparisonResult;
+			} else {
+				return API_VERSION_ORDER.compare(
+					Collections.min(o1.f0.getSupportedAPIVersions()),
+					Collections.min(o2.f0.getSupportedAPIVersions()));
+			}
 		}
 
 		/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
index 4ebcd49..6561679 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
@@ -19,6 +19,10 @@
 package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+
+import java.util.Collection;
+import java.util.Collections;
 
 /**
  * Rest handler interface which all rest handler implementation have to implement.
@@ -38,4 +42,13 @@ public interface RestHandlerSpecification {
 	 * @return endpoint url that this request should be sent to
 	 */
 	String getTargetRestEndpointURL();
+
+	/**
+	 * Returns the supported API versions that this request supports.
+	 *
+	 * @return Collection of supported API versions
+	 */
+	default Collection<RestAPIVersion> getSupportedAPIVersions() {
+		return Collections.singleton(RestAPIVersion.V1);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java
new file mode 100644
index 0000000..d630563
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.versioning;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+
+/**
+ * An enum for all versions of the REST API.
+ *
+ * <p>REST API versions are global and thus apply to every REST component.
+ *
+ * <p>Changes that must result in an API version increment include but are not limited to:
+ * - modification of a handler url
+ * - addition of new mandatory parameters
+ * - removal of a handler/request
+ * - modifications to request/response bodies (excluding additions)
+ */
+public enum RestAPIVersion {
+	V0(0, false), // strictly for testing purposes
+	V1(1, true);
+
+	private final int versionNumber;
+
+	private final boolean isDefaultVersion;
+
+	RestAPIVersion(int versionNumber, boolean isDefaultVersion) {
+		this.versionNumber = versionNumber;
+		this.isDefaultVersion = isDefaultVersion;
+	}
+
+	/**
+	 * Returns the URL version prefix (e.g. "v1") for this version.
+	 *
+	 * @return URL version prefix
+	 */
+	public String getURLVersionPrefix() {
+		return name().toLowerCase();
+	}
+
+	/**
+	 * Returns whether this version is the default REST API version.
+	 *
+	 * @return whether this version is the default
+	 */
+	public boolean isDefaultVersion() {
+		return isDefaultVersion;
+	}
+
+	/**
+	 * Converts the given URL version prefix (e.g "v1") to a {@link RestAPIVersion}.
+	 *
+	 * @param prefix prefix to converted
+	 * @return REST API version matching the prefix
+	 * @throws IllegalArgumentException if the prefix doesn't match any version
+	 */
+	public static RestAPIVersion fromURLVersionPrefix(String prefix) {
+		return valueOf(prefix.toUpperCase());
+	}
+
+	/**
+	 * Returns the latest version from the given collection.
+	 *
+	 * @param versions possible candidates
+	 * @return latest version
+	 */
+	public static RestAPIVersion getLatestVersion(Collection<RestAPIVersion> versions) {
+		return Collections.max(versions, new RestAPIVersionComparator());
+	}
+
+	/**
+	 * Comparator for {@link RestAPIVersion} that sorts versions based on their version number, i.e. oldest to latest.
+	 */
+	public static class RestAPIVersionComparator implements Comparator<RestAPIVersion> {
+
+		@Override
+		public int compare(RestAPIVersion o1, RestAPIVersion o2) {
+			return Integer.compare(o1.versionNumber, o2.versionNumber);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index 209f2d1..22cd6f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -25,14 +25,18 @@ import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -45,12 +49,13 @@ import static org.junit.Assert.assertThat;
  */
 public class RestClientTest extends TestLogger {
 
+	private static final String unroutableIp = "10.255.255.1";
+
 	@Test
 	public void testConnectionTimeout() throws Exception {
 		final Configuration config = new Configuration();
 		config.setLong(RestOptions.CONNECTION_TIMEOUT, 1);
 		final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), Executors.directExecutor());
-		final String unroutableIp = "10.255.255.1";
 		try {
 			restClient.sendRequest(
 				unroutableIp,
@@ -66,6 +71,27 @@ public class RestClientTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testInvalidVersionRejection() throws Exception {
+		final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), Executors.directExecutor());
+
+		try {
+			CompletableFuture<EmptyResponseBody> invalidVersionResponse = restClient.sendRequest(
+				unroutableIp,
+				80,
+				new TestMessageHeaders(),
+				EmptyMessageParameters.getInstance(),
+				EmptyRequestBody.getInstance(),
+				Collections.emptyList(),
+				RestAPIVersion.V0
+			);
+			Assert.fail("The request should have been rejected due to a version mismatch.");
+		} catch (IllegalArgumentException e) {
+			// expected
+		}
+
+	}
+
 	private static class TestMessageHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 
 		@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 31f78e3..b017610 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -56,8 +57,12 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
 import org.apache.commons.io.IOUtils;
 import org.junit.After;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -85,6 +90,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
@@ -184,6 +190,21 @@ public class RestServerEndpointITCase extends TestLogger {
 			mockGatewayRetriever,
 			RpcUtils.INF_TIMEOUT);
 
+		TestVersionHandler testVersionHandler = new TestVersionHandler(
+			CompletableFuture.completedFuture(restAddress),
+			mockGatewayRetriever,
+			RpcUtils.INF_TIMEOUT);
+
+		TestVersionSelectionHandler1 testVersionSelectionHandler1 = new TestVersionSelectionHandler1(
+			CompletableFuture.completedFuture(restAddress),
+			mockGatewayRetriever,
+			RpcUtils.INF_TIMEOUT);
+
+		TestVersionSelectionHandler2 testVersionSelectionHandler2 = new TestVersionSelectionHandler2(
+			CompletableFuture.completedFuture(restAddress),
+			mockGatewayRetriever,
+			RpcUtils.INF_TIMEOUT);
+
 		testUploadHandler = new TestUploadHandler(
 			CompletableFuture.completedFuture(restAddress),
 			mockGatewayRetriever,
@@ -198,6 +219,9 @@ public class RestServerEndpointITCase extends TestLogger {
 		final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList(
 			Tuple2.of(new TestHeaders(), testHandler),
 			Tuple2.of(TestUploadHeaders.INSTANCE, testUploadHandler),
+			Tuple2.of(testVersionHandler.getMessageHeaders(), testVersionHandler),
+			Tuple2.of(testVersionSelectionHandler1.getMessageHeaders(), testVersionSelectionHandler1),
+			Tuple2.of(testVersionSelectionHandler2.getMessageHeaders(), testVersionSelectionHandler2),
 			Tuple2.of(WebContentHandlerSpecification.getInstance(), staticFileServerHandler));
 
 		serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers);
@@ -415,6 +439,88 @@ public class RestServerEndpointITCase extends TestLogger {
 		assertEquals("foobar", fileContents.trim());
 	}
 
+	@Test
+	public void testVersioning() throws Exception {
+		CompletableFuture<EmptyResponseBody> unspecifiedVersionResponse = restClient.sendRequest(
+			serverAddress.getHostName(),
+			serverAddress.getPort(),
+			TestVersionHeaders.INSTANCE,
+			EmptyMessageParameters.getInstance(),
+			EmptyRequestBody.getInstance(),
+			Collections.emptyList()
+		);
+
+		unspecifiedVersionResponse.get(5, TimeUnit.SECONDS);
+
+		CompletableFuture<EmptyResponseBody> specifiedVersionResponse = restClient.sendRequest(
+			serverAddress.getHostName(),
+			serverAddress.getPort(),
+			TestVersionHeaders.INSTANCE,
+			EmptyMessageParameters.getInstance(),
+			EmptyRequestBody.getInstance(),
+			Collections.emptyList(),
+			RestAPIVersion.V1
+		);
+
+		specifiedVersionResponse.get(5, TimeUnit.SECONDS);
+	}
+
+	@Test
+	public void testVersionSelection() throws Exception {
+		CompletableFuture<EmptyResponseBody> version1Response = restClient.sendRequest(
+			serverAddress.getHostName(),
+			serverAddress.getPort(),
+			TestVersionSelectionHeaders1.INSTANCE,
+			EmptyMessageParameters.getInstance(),
+			EmptyRequestBody.getInstance(),
+			Collections.emptyList(),
+			RestAPIVersion.V0
+		);
+
+		try {
+			version1Response.get(5, TimeUnit.SECONDS);
+			fail();
+		} catch (ExecutionException ee) {
+			RestClientException rce = (RestClientException) ee.getCause();
+			assertEquals(HttpResponseStatus.OK, rce.getHttpResponseStatus());
+		}
+
+		CompletableFuture<EmptyResponseBody> version2Response = restClient.sendRequest(
+			serverAddress.getHostName(),
+			serverAddress.getPort(),
+			TestVersionSelectionHeaders2.INSTANCE,
+			EmptyMessageParameters.getInstance(),
+			EmptyRequestBody.getInstance(),
+			Collections.emptyList(),
+			RestAPIVersion.V1
+		);
+
+		try {
+			version2Response.get(5, TimeUnit.SECONDS);
+			fail();
+		} catch (ExecutionException ee) {
+			RestClientException rce = (RestClientException) ee.getCause();
+			assertEquals(HttpResponseStatus.ACCEPTED, rce.getHttpResponseStatus());
+		}
+	}
+
+	@Test
+	public void testDefaultVersionRouting() throws Exception {
+		Assume.assumeFalse(
+			"Ignoring SSL-enabled test to keep OkHttp usage simple.",
+			config.getBoolean(SecurityOptions.SSL_REST_ENABLED));
+
+		OkHttpClient client = new OkHttpClient();
+
+		final Request request = new Request.Builder()
+			.url(serverEndpoint.getRestBaseUrl() + TestVersionSelectionHeaders2.INSTANCE.getTargetRestEndpointURL())
+			.build();
+
+		try (final Response response = client.newCall(request).execute()) {
+			assertEquals(HttpResponseStatus.ACCEPTED.code(), response.code());
+		}
+	}
+
 	private HttpURLConnection openHttpConnectionForUpload(final String boundary) throws IOException {
 		final HttpURLConnection connection =
 			(HttpURLConnection) new URL(serverEndpoint.getRestBaseUrl() + "/upload").openConnection();
@@ -697,6 +803,151 @@ public class RestServerEndpointITCase extends TestLogger {
 		}
 	}
 
+	private static class TestVersionHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+		private TestVersionHandler(
+			final CompletableFuture<String> localRestAddress,
+			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			final Time timeout) {
+			super(localRestAddress, leaderRetriever, timeout, Collections.emptyMap(), TestVersionHeaders.INSTANCE);
+		}
+
+		@Override
+		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+			return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+		}
+	}
+
+	private enum TestVersionHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+		INSTANCE;
+
+		@Override
+		public Class<EmptyRequestBody> getRequestClass() {
+			return EmptyRequestBody.class;
+		}
+
+		@Override
+		public HttpMethodWrapper getHttpMethod() {
+			return HttpMethodWrapper.GET;
+		}
+
+		@Override
+		public String getTargetRestEndpointURL() {
+			return "/test/versioning";
+		}
+
+		@Override
+		public Class<EmptyResponseBody> getResponseClass() {
+			return EmptyResponseBody.class;
+		}
+
+		@Override
+		public HttpResponseStatus getResponseStatusCode() {
+			return HttpResponseStatus.OK;
+		}
+
+		@Override
+		public String getDescription() {
+			return null;
+		}
+
+		@Override
+		public EmptyMessageParameters getUnresolvedMessageParameters() {
+			return EmptyMessageParameters.getInstance();
+		}
+
+		@Override
+		public Collection<RestAPIVersion> getSupportedAPIVersions() {
+			return Collections.singleton(RestAPIVersion.V1);
+		}
+	}
+
+	private interface TestVersionSelectionHeadersBase extends MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+		@Override
+		default Class<EmptyRequestBody> getRequestClass() {
+			return EmptyRequestBody.class;
+		}
+
+		@Override
+		default HttpMethodWrapper getHttpMethod() {
+			return HttpMethodWrapper.GET;
+		}
+
+		@Override
+		default String getTargetRestEndpointURL() {
+			return "/test/select-version";
+		}
+
+		@Override
+		default Class<EmptyResponseBody> getResponseClass() {
+			return EmptyResponseBody.class;
+		}
+
+		@Override
+		default HttpResponseStatus getResponseStatusCode() {
+			return HttpResponseStatus.OK;
+		}
+
+		@Override
+		default String getDescription() {
+			return null;
+		}
+
+		@Override
+		default EmptyMessageParameters getUnresolvedMessageParameters() {
+			return EmptyMessageParameters.getInstance();
+		}
+	}
+
+	private enum TestVersionSelectionHeaders1 implements TestVersionSelectionHeadersBase {
+		INSTANCE;
+
+		@Override
+		public Collection<RestAPIVersion> getSupportedAPIVersions() {
+			return Collections.singleton(RestAPIVersion.V0);
+		}
+	}
+
+	private enum TestVersionSelectionHeaders2 implements TestVersionSelectionHeadersBase {
+		INSTANCE;
+
+		@Override
+		public Collection<RestAPIVersion> getSupportedAPIVersions() {
+			return Collections.singleton(RestAPIVersion.V1);
+		}
+	}
+
+	private static class TestVersionSelectionHandler1 extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+		private TestVersionSelectionHandler1(
+			final CompletableFuture<String> localRestAddress,
+			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			final Time timeout) {
+			super(localRestAddress, leaderRetriever, timeout, Collections.emptyMap(), TestVersionSelectionHeaders1.INSTANCE);
+		}
+
+		@Override
+		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+			throw new RestHandlerException("test failure 1", HttpResponseStatus.OK);
+		}
+	}
+
+	private static class TestVersionSelectionHandler2 extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+		private TestVersionSelectionHandler2(
+			final CompletableFuture<String> localRestAddress,
+			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			final Time timeout) {
+			super(localRestAddress, leaderRetriever, timeout, Collections.emptyMap(), TestVersionSelectionHeaders2.INSTANCE);
+		}
+
+		@Override
+		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+			throw new RestHandlerException("test failure 2", HttpResponseStatus.ACCEPTED);
+		}
+	}
+
 	private enum TestUploadHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 		INSTANCE;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java
new file mode 100644
index 0000000..4f60da1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.versioning;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Tests for {@link RestAPIVersion}.
+ */
+public class RestAPIVersionTest extends TestLogger {
+	@Test
+	public void testGetLatest() {
+		Collection<RestAPIVersion> candidates = Arrays.asList(RestAPIVersion.V0, RestAPIVersion.V1);
+		Assert.assertEquals(RestAPIVersion.V1, RestAPIVersion.getLatestVersion(candidates));
+	}
+
+	@Test
+	public void testSingleDefaultVersion() {
+		final List<RestAPIVersion> defaultVersions = Arrays.stream(RestAPIVersion.values())
+			.filter(RestAPIVersion::isDefaultVersion)
+			.collect(Collectors.toList());
+
+		Assert.assertEquals(
+			"Only one RestAPIVersion should be marked as the default. Defaults: " + defaultVersions,
+			1,
+			defaultVersions.size());
+	}
+}