You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/03 08:33:48 UTC

[GitHub] zentol closed pull request #6602: [FLINK-7551][rest] Add versioning to REST API

zentol closed pull request #6602:  [FLINK-7551][rest] Add versioning to REST API
URL: https://github.com/apache/flink/pull/6602
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/rest_dispatcher.html b/docs/_includes/generated/rest_v1_dispatcher.html
similarity index 100%
rename from docs/_includes/generated/rest_dispatcher.html
rename to docs/_includes/generated/rest_v1_dispatcher.html
diff --git a/docs/monitoring/rest_api.md b/docs/monitoring/rest_api.md
index ec92051b396..fdbaa705e59 100644
--- a/docs/monitoring/rest_api.md
+++ b/docs/monitoring/rest_api.md
@@ -52,13 +52,26 @@ To add new requests, one needs to
 A good example is the `org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler` that uses the `org.apache.flink.runtime.rest.messages.JobExceptionsHeaders`.
 
 
-## Available Requests
+## API
 
-### Dispatcher
+The REST API is versioned, with specific versions being queryable by prefixing the url with the version prefix. Prefixes are always of the form `v[version_number]`.
+For example, to access version 1 of `/foo/bar` one would query `/v1/foo/bar`.
 
-{% include generated/rest_dispatcher.html %}
+If no version is specified Flink will default to the *oldest* version supporting the request.
 
-## Legacy
+Querying unsupported/non-existing versions will return a 404 error.
+
+<span class="label label-danger">Attention</span> REST API versioning is *not* active if the cluster runs in [legacy mode](../ops/config.html#mode). For this case please refer to the legacy API below.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="v1" markdown="1">
+#### Dispatcher
+
+{% include generated/rest_v1_dispatcher.html %}
+</div>
+
+<div data-lang="legacy" markdown="1">
 
 This section is only relevant if the cluster runs in [legacy mode](../ops/config.html#mode).
 
@@ -90,7 +103,7 @@ Values in angle brackets are variables, for example `http://hostname:8081/jobs/<
   - `/jars/:jarid/run`
 
 
-### General
+#### General
 
 **`/config`**
 
@@ -126,7 +139,7 @@ Sample Result:
 }
 {% endhighlight %}
 
-### Overview of Jobs
+#### Overview of Jobs
 
 **`/jobs/overview`**
 
@@ -163,7 +176,7 @@ Sample Result:
 }
 {% endhighlight %}
 
-### Details of a Running or Completed Job
+#### Details of a Running or Completed Job
 
 **`/jobs/<jobid>`**
 
@@ -573,15 +586,15 @@ Sample Result:
 }
 {% endhighlight %}
 
-### Job Cancellation
+#### Job Cancellation
 
-#### Cancel Job
+##### Cancel Job
 
 `DELETE` request to **`/jobs/:jobid/cancel`**.
 
 Triggers job cancellation, result on success is `{}`.
 
-#### Cancel Job with Savepoint
+##### Cancel Job with Savepoint
 
 Triggers a savepoint and cancels the job after the savepoint succeeds.
 
@@ -601,7 +614,7 @@ Sample Trigger Result:
 }
 {% endhighlight %}
 
-##### Monitoring Progress
+###### Monitoring Progress
 
 The progress of the cancellation has to be monitored by the user at
 
@@ -611,7 +624,7 @@ The progress of the cancellation has to be monitored by the user at
 
 The request ID is returned by the trigger result.
 
-###### In-Progress
+####### In-Progress
 
 {% highlight json %}
 {
@@ -620,7 +633,7 @@ The request ID is returned by the trigger result.
 }
 {% endhighlight %}
 
-###### Success
+####### Success
 
 {% highlight json %}
 {
@@ -632,7 +645,7 @@ The request ID is returned by the trigger result.
 
 The `savepointPath` points to the external path of the savepoint, which can be used to resume the savepoint.
 
-###### Failed
+####### Failed
 
 {% highlight json %}
 {
@@ -642,11 +655,11 @@ The `savepointPath` points to the external path of the savepoint, which can be u
 }
 {% endhighlight %}
 
-### Submitting Programs
+#### Submitting Programs
 
 It is possible to upload, run, and list Flink programs via the REST APIs and web frontend.
 
-#### Upload a new JAR file
+##### Upload a new JAR file
 
 Send a `POST` request to `/jars/upload` with your jar file sent as multi-part data under the `jarfile` file.
 Also make sure that the multi-part data includes the `Content-Type` of the file itself, some http libraries do not add the header by default.
@@ -659,7 +672,7 @@ Content-Disposition: form-data; name="jarfile"; filename="YourFileName.jar"
 Content-Type: application/x-java-archive
 {% endhighlight %}
 
-#### Run a Program (POST)
+##### Run a Program (POST)
 
 Send a `POST` request to `/jars/:jarid/run`. The `jarid` parameter is the file name of the program JAR in the configured web frontend upload directory (configuration key `jobmanager.web.upload.dir`).
 
@@ -688,3 +701,7 @@ Response:
 {% endhighlight %}
 
 {% top %}
+</div>
+
+</div>
+
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 82fdeec7cf6..4df1d6ee71b 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -37,6 +37,7 @@
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
@@ -123,13 +124,24 @@
 	public static void main(String[] args) throws IOException {
 		String outputDirectory = args[0];
 
-		createHtmlFile(new DocumentingDispatcherRestEndpoint(), Paths.get(outputDirectory, "rest_dispatcher.html"));
+		for (final RestAPIVersion apiVersion : RestAPIVersion.values()) {
+			if (apiVersion == RestAPIVersion.V0) {
+				// this version exists only for testing purposes
+				continue;
+			}
+			createHtmlFile(
+				new DocumentingDispatcherRestEndpoint(),
+				apiVersion,
+				Paths.get(outputDirectory, "rest_" + apiVersion.getURLVersionPrefix() + "_dispatcher.html"));
+		}
 	}
 
-	private static void createHtmlFile(DocumentingRestEndpoint restEndpoint, Path outputFile) throws IOException {
+	private static void createHtmlFile(DocumentingRestEndpoint restEndpoint, RestAPIVersion apiVersion, Path outputFile) throws IOException {
 		StringBuilder html = new StringBuilder();
 
-		List<MessageHeaders> specs = restEndpoint.getSpecs();
+		List<MessageHeaders> specs = restEndpoint.getSpecs().stream()
+			.filter(spec -> spec.getSupportedAPIVersions().contains(apiVersion))
+			.collect(Collectors.toList());
 		specs.forEach(spec -> html.append(createHtmlEntry(spec)));
 
 		Files.deleteIfExists(outputFile);
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
index b0c2102ed4f..d8542d69587 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
@@ -28,6 +28,8 @@
 
 import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
@@ -57,6 +59,7 @@
 import java.nio.file.Files;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.Collections;
 import java.util.Date;
 import java.util.Locale;
 
@@ -158,7 +161,12 @@ private void respondWithFile(ChannelHandlerContext ctx, HttpRequest request, Str
 				} finally {
 					if (!success) {
 						LOG.debug("Unable to load requested file {} from classloader", pathToLoad);
-						StaticFileServerHandler.sendError(ctx, NOT_FOUND);
+						HandlerUtils.sendErrorResponse(
+							ctx,
+							request,
+							new ErrorResponseBody("File not found."),
+							NOT_FOUND,
+							Collections.emptyMap());
 						return;
 					}
 				}
@@ -166,12 +174,22 @@ private void respondWithFile(ChannelHandlerContext ctx, HttpRequest request, Str
 		}
 
 		if (!file.exists() || file.isHidden() || file.isDirectory() || !file.isFile()) {
-			StaticFileServerHandler.sendError(ctx, NOT_FOUND);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				request,
+				new ErrorResponseBody("File not found."),
+				NOT_FOUND,
+				Collections.emptyMap());
 			return;
 		}
 
 		if (!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) {
-			StaticFileServerHandler.sendError(ctx, NOT_FOUND);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				request,
+				new ErrorResponseBody("File not found."),
+				NOT_FOUND,
+				Collections.emptyMap());
 			return;
 		}
 
@@ -204,7 +222,12 @@ private void respondWithFile(ChannelHandlerContext ctx, HttpRequest request, Str
 		try {
 			raf = new RandomAccessFile(file, "r");
 		} catch (FileNotFoundException e) {
-			StaticFileServerHandler.sendError(ctx, NOT_FOUND);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				request,
+				new ErrorResponseBody("File not found."),
+				NOT_FOUND,
+				Collections.emptyMap());
 			return;
 		}
 
@@ -244,7 +267,12 @@ private void respondWithFile(ChannelHandlerContext ctx, HttpRequest request, Str
 		} catch (Exception e) {
 			raf.close();
 			LOG.error("Failed to serve file.", e);
-			StaticFileServerHandler.sendError(ctx, INTERNAL_SERVER_ERROR);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				request,
+				new ErrorResponseBody("Internal server error."),
+				INTERNAL_SERVER_ERROR,
+				Collections.emptyMap());
 		}
 	}
 
@@ -252,7 +280,12 @@ private void respondWithFile(ChannelHandlerContext ctx, HttpRequest request, Str
 	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
 		if (ctx.channel().isActive()) {
 			LOG.error("Caught exception", cause);
-			StaticFileServerHandler.sendError(ctx, INTERNAL_SERVER_ERROR);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				false,
+				new ErrorResponseBody("Internal server error."),
+				INTERNAL_SERVER_ERROR,
+				Collections.emptyMap());
 		}
 	}
 }
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
index b08504d0758..19a3d52da58 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
@@ -30,6 +30,8 @@
 
 import java.io.File;
 
+import static org.hamcrest.CoreMatchers.containsString;
+
 /**
  * Tests for the HistoryServerStaticFileServerHandler.
  */
@@ -56,7 +58,7 @@ public void testRespondWithFile() throws Exception {
 		try {
 			// verify that 404 message is returned when requesting a non-existent file
 			String notFound404 = HistoryServerTest.getFromHTTP("http://localhost:" + port + "/hello");
-			Assert.assertTrue(notFound404.contains("404 Not Found"));
+			Assert.assertThat(notFound404, containsString("not found"));
 
 			// verify that a) a file can be loaded using the ClassLoader and b) that the HistoryServer
 			// index_hs.html is injected
@@ -71,12 +73,12 @@ public void testRespondWithFile() throws Exception {
 			File dir = new File(webDir, "dir.json");
 			dir.mkdirs();
 			String dirNotFound404 = HistoryServerTest.getFromHTTP("http://localhost:" + port + "/dir");
-			Assert.assertTrue(dirNotFound404.contains("404 Not Found"));
+			Assert.assertTrue(dirNotFound404.contains("not found"));
 
 			// verify that a 404 message is returned when requesting a file outside the webDir
 			tmp.newFile("secret");
 			String x = HistoryServerTest.getFromHTTP("http://localhost:" + port + "/../secret");
-			Assert.assertTrue(x.contains("404 Not Found"));
+			Assert.assertTrue(x.contains("not found"));
 		} finally {
 			webUI.shutdown();
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 2e9de4c168d..a8557496b08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -31,6 +31,7 @@
 import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -85,6 +86,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE;
 
@@ -173,6 +175,24 @@ public void shutdown(Time timeout) {
 			U messageParameters,
 			R request,
 			Collection<FileUpload> fileUploads) throws IOException {
+		return sendRequest(
+			targetAddress,
+			targetPort,
+			messageHeaders,
+			messageParameters,
+			request,
+			fileUploads,
+			RestAPIVersion.getLatestVersion(messageHeaders.getSupportedAPIVersions()));
+	}
+
+	public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
+			String targetAddress,
+			int targetPort,
+			M messageHeaders,
+			U messageParameters,
+			R request,
+			Collection<FileUpload> fileUploads,
+			RestAPIVersion apiVersion) throws IOException {
 		Preconditions.checkNotNull(targetAddress);
 		Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536].");
 		Preconditions.checkNotNull(messageHeaders);
@@ -181,7 +201,17 @@ public void shutdown(Time timeout) {
 		Preconditions.checkNotNull(fileUploads);
 		Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved.");
 
-		String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
+		if (!messageHeaders.getSupportedAPIVersions().contains(apiVersion)) {
+			throw new IllegalArgumentException(String.format(
+				"The requested version %s is not supported by the request (method=%s URL=%s). Supported versions are: %s.",
+				apiVersion,
+				messageHeaders.getHttpMethod(),
+				messageHeaders.getTargetRestEndpointURL(),
+				messageHeaders.getSupportedAPIVersions().stream().map(RestAPIVersion::getURLVersionPrefix).collect(Collectors.joining(","))));
+		}
+
+		String versionedHandlerURL = "/" + apiVersion.getURLVersionPrefix() + messageHeaders.getTargetRestEndpointURL();
+		String targetUrl = MessageParameters.resolveUrl(versionedHandlerURL, messageParameters);
 
 		LOG.debug("Sending request of class {} to {}:{}{}", request.getClass(), targetAddress, targetPort, targetUrl);
 		// serialize payload
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index e836e357b5d..28af072a10c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -27,6 +27,7 @@
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.rest.handler.router.RouterHandler;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.Preconditions;
@@ -144,8 +145,7 @@ public final void start() throws Exception {
 				RestHandlerUrlComparator.INSTANCE);
 
 			handlers.forEach(handler -> {
-				log.debug("Register handler {} under {}@{}.", handler.f1, handler.f0.getHttpMethod(), handler.f0.getTargetRestEndpointURL());
-				registerHandler(router, handler);
+				registerHandler(router, handler, log);
 			});
 
 			ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@@ -364,22 +364,37 @@ public String getRestBaseUrl() {
 		}
 	}
 
-	private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler) {
-		switch (specificationHandler.f0.getHttpMethod()) {
+	private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler, Logger log) {
+		final String handlerURL = specificationHandler.f0.getTargetRestEndpointURL();
+		// setup versioned urls
+		for (final RestAPIVersion supportedVersion : specificationHandler.f0.getSupportedAPIVersions()) {
+			final String versionedHandlerURL = '/' + supportedVersion.getURLVersionPrefix() + handlerURL;
+			log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), versionedHandlerURL);
+			registerHandler(router, versionedHandlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1);
+			if (supportedVersion.isDefaultVersion()) {
+				// setup unversioned url for convenience and backwards compatibility
+				log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), handlerURL);
+				registerHandler(router, handlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1);
+			}
+		}
+	}
+
+	private static void registerHandler(Router router, String handlerURL, HttpMethodWrapper httpMethod, ChannelInboundHandler handler) {
+		switch (httpMethod) {
 			case GET:
-				router.addGet(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+				router.addGet(handlerURL, handler);
 				break;
 			case POST:
-				router.addPost(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+				router.addPost(handlerURL, handler);
 				break;
 			case DELETE:
-				router.addDelete(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+				router.addDelete(handlerURL, handler);
 				break;
 			case PATCH:
-				router.addPatch(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+				router.addPatch(handlerURL, handler);
 				break;
 			default:
-				throw new RuntimeException("Unsupported http method: " + specificationHandler.f0.getHttpMethod() + '.');
+				throw new RuntimeException("Unsupported http method: " + httpMethod + '.');
 		}
 	}
 
@@ -437,13 +452,22 @@ private static synchronized void checkAndCreateUploadDir(final Path uploadDir, f
 
 		private static final Comparator<String> CASE_INSENSITIVE_ORDER = new CaseInsensitiveOrderComparator();
 
+		private static final Comparator<RestAPIVersion> API_VERSION_ORDER = new RestAPIVersion.RestAPIVersionComparator();
+
 		static final RestHandlerUrlComparator INSTANCE = new RestHandlerUrlComparator();
 
 		@Override
 		public int compare(
 				Tuple2<RestHandlerSpecification, ChannelInboundHandler> o1,
 				Tuple2<RestHandlerSpecification, ChannelInboundHandler> o2) {
-			return CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), o2.f0.getTargetRestEndpointURL());
+			final int urlComparisonResult = CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), o2.f0.getTargetRestEndpointURL());
+			if (urlComparisonResult != 0) {
+				return urlComparisonResult;
+			} else {
+				return API_VERSION_ORDER.compare(
+					Collections.min(o1.f0.getSupportedAPIVersions()),
+					Collections.min(o2.f0.getSupportedAPIVersions()));
+			}
 		}
 
 		/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
index 4ebcd49c159..656167974b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
@@ -19,6 +19,10 @@
 package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+
+import java.util.Collection;
+import java.util.Collections;
 
 /**
  * Rest handler interface which all rest handler implementation have to implement.
@@ -38,4 +42,13 @@
 	 * @return endpoint url that this request should be sent to
 	 */
 	String getTargetRestEndpointURL();
+
+	/**
+	 * Returns the supported API versions that this request supports.
+	 *
+	 * @return Collection of supported API versions
+	 */
+	default Collection<RestAPIVersion> getSupportedAPIVersions() {
+		return Collections.singleton(RestAPIVersion.V1);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
index 62b94e50bc8..969945f749d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
@@ -29,11 +29,12 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.rest.handler.RedirectHandler;
 import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.handler.util.MimeTypes;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
@@ -50,7 +51,6 @@
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
-import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -184,7 +184,12 @@ private void respondToRequest(ChannelHandlerContext ctx, HttpRequest request, St
 				} finally {
 					if (!success) {
 						logger.debug("Unable to load requested file {} from classloader", requestPath);
-						sendError(ctx, NOT_FOUND);
+						HandlerUtils.sendErrorResponse(
+							ctx,
+							request,
+							new ErrorResponseBody(String.format("Unable to load requested file %s.", requestPath)),
+							NOT_FOUND,
+							responseHeaders);
 						return;
 					}
 				}
@@ -192,12 +197,22 @@ private void respondToRequest(ChannelHandlerContext ctx, HttpRequest request, St
 		}
 
 		if (!file.exists() || file.isHidden() || file.isDirectory() || !file.isFile()) {
-			sendError(ctx, NOT_FOUND);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				request,
+				new ErrorResponseBody("File not found."),
+				NOT_FOUND,
+				responseHeaders);
 			return;
 		}
 
 		if (!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) {
-			sendError(ctx, NOT_FOUND);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				request,
+				new ErrorResponseBody("File not found."),
+				NOT_FOUND,
+				responseHeaders);
 			return;
 		}
 
@@ -231,7 +246,12 @@ private void respondToRequest(ChannelHandlerContext ctx, HttpRequest request, St
 			raf = new RandomAccessFile(file, "r");
 		}
 		catch (FileNotFoundException e) {
-			sendError(ctx, NOT_FOUND);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				request,
+				new ErrorResponseBody("File not found."),
+				HttpResponseStatus.NOT_FOUND,
+				responseHeaders);
 			return;
 		}
 
@@ -271,7 +291,12 @@ private void respondToRequest(ChannelHandlerContext ctx, HttpRequest request, St
 		} catch (Exception e) {
 			raf.close();
 			logger.error("Failed to serve file.", e);
-			sendError(ctx, INTERNAL_SERVER_ERROR);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				request,
+				new ErrorResponseBody("Internal server error."),
+				INTERNAL_SERVER_ERROR,
+				responseHeaders);
 		}
 	}
 
@@ -279,7 +304,12 @@ private void respondToRequest(ChannelHandlerContext ctx, HttpRequest request, St
 	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
 		if (ctx.channel().isActive()) {
 			logger.error("Caught exception", cause);
-			sendError(ctx, INTERNAL_SERVER_ERROR);
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				false,
+				new ErrorResponseBody("Internal server error."),
+				INTERNAL_SERVER_ERROR,
+				Collections.emptyMap());
 		}
 	}
 
@@ -287,21 +317,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
 	//  Utilities to encode headers and responses
 	// ------------------------------------------------------------------------
 
-	/**
-	 * Writes a simple  error response message.
-	 *
-	 * @param ctx    The channel context to write the response to.
-	 * @param status The response status.
-	 */
-	public static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
-		FullHttpResponse response = new DefaultFullHttpResponse(
-				HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
-		response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
-
-		// close the connection as soon as the error message is sent.
-		ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
-	}
-
 	/**
 	 * Send the "304 Not Modified" response. This response can be used when the
 	 * file timestamp is the same as what the browser is sending up.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java
new file mode 100644
index 00000000000..d6305635ec0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.versioning;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+
+/**
+ * An enum for all versions of the REST API.
+ *
+ * <p>REST API versions are global and thus apply to every REST component.
+ *
+ * <p>Changes that must result in an API version increment include but are not limited to:
+ * - modification of a handler url
+ * - addition of new mandatory parameters
+ * - removal of a handler/request
+ * - modifications to request/response bodies (excluding additions)
+ */
+public enum RestAPIVersion {
+	V0(0, false), // strictly for testing purposes
+	V1(1, true);
+
+	private final int versionNumber;
+
+	private final boolean isDefaultVersion;
+
+	RestAPIVersion(int versionNumber, boolean isDefaultVersion) {
+		this.versionNumber = versionNumber;
+		this.isDefaultVersion = isDefaultVersion;
+	}
+
+	/**
+	 * Returns the URL version prefix (e.g. "v1") for this version.
+	 *
+	 * @return URL version prefix
+	 */
+	public String getURLVersionPrefix() {
+		return name().toLowerCase();
+	}
+
+	/**
+	 * Returns whether this version is the default REST API version.
+	 *
+	 * @return whether this version is the default
+	 */
+	public boolean isDefaultVersion() {
+		return isDefaultVersion;
+	}
+
+	/**
+	 * Converts the given URL version prefix (e.g "v1") to a {@link RestAPIVersion}.
+	 *
+	 * @param prefix prefix to converted
+	 * @return REST API version matching the prefix
+	 * @throws IllegalArgumentException if the prefix doesn't match any version
+	 */
+	public static RestAPIVersion fromURLVersionPrefix(String prefix) {
+		return valueOf(prefix.toUpperCase());
+	}
+
+	/**
+	 * Returns the latest version from the given collection.
+	 *
+	 * @param versions possible candidates
+	 * @return latest version
+	 */
+	public static RestAPIVersion getLatestVersion(Collection<RestAPIVersion> versions) {
+		return Collections.max(versions, new RestAPIVersionComparator());
+	}
+
+	/**
+	 * Comparator for {@link RestAPIVersion} that sorts versions based on their version number, i.e. oldest to latest.
+	 */
+	public static class RestAPIVersionComparator implements Comparator<RestAPIVersion> {
+
+		@Override
+		public int compare(RestAPIVersion o1, RestAPIVersion o2) {
+			return Integer.compare(o1.versionNumber, o2.versionNumber);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index 209f2d1d351..22cd6f62063 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -25,14 +25,18 @@
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -45,12 +49,13 @@
  */
 public class RestClientTest extends TestLogger {
 
+	private static final String unroutableIp = "10.255.255.1";
+
 	@Test
 	public void testConnectionTimeout() throws Exception {
 		final Configuration config = new Configuration();
 		config.setLong(RestOptions.CONNECTION_TIMEOUT, 1);
 		final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), Executors.directExecutor());
-		final String unroutableIp = "10.255.255.1";
 		try {
 			restClient.sendRequest(
 				unroutableIp,
@@ -66,6 +71,27 @@ public void testConnectionTimeout() throws Exception {
 		}
 	}
 
+	@Test
+	public void testInvalidVersionRejection() throws Exception {
+		final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), Executors.directExecutor());
+
+		try {
+			CompletableFuture<EmptyResponseBody> invalidVersionResponse = restClient.sendRequest(
+				unroutableIp,
+				80,
+				new TestMessageHeaders(),
+				EmptyMessageParameters.getInstance(),
+				EmptyRequestBody.getInstance(),
+				Collections.emptyList(),
+				RestAPIVersion.V0
+			);
+			Assert.fail("The request should have been rejected due to a version mismatch.");
+		} catch (IllegalArgumentException e) {
+			// expected
+		}
+
+	}
+
 	private static class TestMessageHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 
 		@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 31f78e36cf8..b017610aa3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -43,6 +43,7 @@
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -56,8 +57,12 @@
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
 import org.apache.commons.io.IOUtils;
 import org.junit.After;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -85,6 +90,7 @@
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
@@ -184,6 +190,21 @@ public void setup() throws Exception {
 			mockGatewayRetriever,
 			RpcUtils.INF_TIMEOUT);
 
+		TestVersionHandler testVersionHandler = new TestVersionHandler(
+			CompletableFuture.completedFuture(restAddress),
+			mockGatewayRetriever,
+			RpcUtils.INF_TIMEOUT);
+
+		TestVersionSelectionHandler1 testVersionSelectionHandler1 = new TestVersionSelectionHandler1(
+			CompletableFuture.completedFuture(restAddress),
+			mockGatewayRetriever,
+			RpcUtils.INF_TIMEOUT);
+
+		TestVersionSelectionHandler2 testVersionSelectionHandler2 = new TestVersionSelectionHandler2(
+			CompletableFuture.completedFuture(restAddress),
+			mockGatewayRetriever,
+			RpcUtils.INF_TIMEOUT);
+
 		testUploadHandler = new TestUploadHandler(
 			CompletableFuture.completedFuture(restAddress),
 			mockGatewayRetriever,
@@ -198,6 +219,9 @@ public void setup() throws Exception {
 		final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList(
 			Tuple2.of(new TestHeaders(), testHandler),
 			Tuple2.of(TestUploadHeaders.INSTANCE, testUploadHandler),
+			Tuple2.of(testVersionHandler.getMessageHeaders(), testVersionHandler),
+			Tuple2.of(testVersionSelectionHandler1.getMessageHeaders(), testVersionSelectionHandler1),
+			Tuple2.of(testVersionSelectionHandler2.getMessageHeaders(), testVersionSelectionHandler2),
 			Tuple2.of(WebContentHandlerSpecification.getInstance(), staticFileServerHandler));
 
 		serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers);
@@ -415,6 +439,88 @@ public void testStaticFileServerHandler() throws Exception {
 		assertEquals("foobar", fileContents.trim());
 	}
 
+	@Test
+	public void testVersioning() throws Exception {
+		CompletableFuture<EmptyResponseBody> unspecifiedVersionResponse = restClient.sendRequest(
+			serverAddress.getHostName(),
+			serverAddress.getPort(),
+			TestVersionHeaders.INSTANCE,
+			EmptyMessageParameters.getInstance(),
+			EmptyRequestBody.getInstance(),
+			Collections.emptyList()
+		);
+
+		unspecifiedVersionResponse.get(5, TimeUnit.SECONDS);
+
+		CompletableFuture<EmptyResponseBody> specifiedVersionResponse = restClient.sendRequest(
+			serverAddress.getHostName(),
+			serverAddress.getPort(),
+			TestVersionHeaders.INSTANCE,
+			EmptyMessageParameters.getInstance(),
+			EmptyRequestBody.getInstance(),
+			Collections.emptyList(),
+			RestAPIVersion.V1
+		);
+
+		specifiedVersionResponse.get(5, TimeUnit.SECONDS);
+	}
+
+	@Test
+	public void testVersionSelection() throws Exception {
+		CompletableFuture<EmptyResponseBody> version1Response = restClient.sendRequest(
+			serverAddress.getHostName(),
+			serverAddress.getPort(),
+			TestVersionSelectionHeaders1.INSTANCE,
+			EmptyMessageParameters.getInstance(),
+			EmptyRequestBody.getInstance(),
+			Collections.emptyList(),
+			RestAPIVersion.V0
+		);
+
+		try {
+			version1Response.get(5, TimeUnit.SECONDS);
+			fail();
+		} catch (ExecutionException ee) {
+			RestClientException rce = (RestClientException) ee.getCause();
+			assertEquals(HttpResponseStatus.OK, rce.getHttpResponseStatus());
+		}
+
+		CompletableFuture<EmptyResponseBody> version2Response = restClient.sendRequest(
+			serverAddress.getHostName(),
+			serverAddress.getPort(),
+			TestVersionSelectionHeaders2.INSTANCE,
+			EmptyMessageParameters.getInstance(),
+			EmptyRequestBody.getInstance(),
+			Collections.emptyList(),
+			RestAPIVersion.V1
+		);
+
+		try {
+			version2Response.get(5, TimeUnit.SECONDS);
+			fail();
+		} catch (ExecutionException ee) {
+			RestClientException rce = (RestClientException) ee.getCause();
+			assertEquals(HttpResponseStatus.ACCEPTED, rce.getHttpResponseStatus());
+		}
+	}
+
+	@Test
+	public void testDefaultVersionRouting() throws Exception {
+		Assume.assumeFalse(
+			"Ignoring SSL-enabled test to keep OkHttp usage simple.",
+			config.getBoolean(SecurityOptions.SSL_REST_ENABLED));
+
+		OkHttpClient client = new OkHttpClient();
+
+		final Request request = new Request.Builder()
+			.url(serverEndpoint.getRestBaseUrl() + TestVersionSelectionHeaders2.INSTANCE.getTargetRestEndpointURL())
+			.build();
+
+		try (final Response response = client.newCall(request).execute()) {
+			assertEquals(HttpResponseStatus.ACCEPTED.code(), response.code());
+		}
+	}
+
 	private HttpURLConnection openHttpConnectionForUpload(final String boundary) throws IOException {
 		final HttpURLConnection connection =
 			(HttpURLConnection) new URL(serverEndpoint.getRestBaseUrl() + "/upload").openConnection();
@@ -697,6 +803,151 @@ private TestUploadHandler(
 		}
 	}
 
+	private static class TestVersionHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+		private TestVersionHandler(
+			final CompletableFuture<String> localRestAddress,
+			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			final Time timeout) {
+			super(localRestAddress, leaderRetriever, timeout, Collections.emptyMap(), TestVersionHeaders.INSTANCE);
+		}
+
+		@Override
+		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+			return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+		}
+	}
+
+	private enum TestVersionHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+		INSTANCE;
+
+		@Override
+		public Class<EmptyRequestBody> getRequestClass() {
+			return EmptyRequestBody.class;
+		}
+
+		@Override
+		public HttpMethodWrapper getHttpMethod() {
+			return HttpMethodWrapper.GET;
+		}
+
+		@Override
+		public String getTargetRestEndpointURL() {
+			return "/test/versioning";
+		}
+
+		@Override
+		public Class<EmptyResponseBody> getResponseClass() {
+			return EmptyResponseBody.class;
+		}
+
+		@Override
+		public HttpResponseStatus getResponseStatusCode() {
+			return HttpResponseStatus.OK;
+		}
+
+		@Override
+		public String getDescription() {
+			return null;
+		}
+
+		@Override
+		public EmptyMessageParameters getUnresolvedMessageParameters() {
+			return EmptyMessageParameters.getInstance();
+		}
+
+		@Override
+		public Collection<RestAPIVersion> getSupportedAPIVersions() {
+			return Collections.singleton(RestAPIVersion.V1);
+		}
+	}
+
+	private interface TestVersionSelectionHeadersBase extends MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+		@Override
+		default Class<EmptyRequestBody> getRequestClass() {
+			return EmptyRequestBody.class;
+		}
+
+		@Override
+		default HttpMethodWrapper getHttpMethod() {
+			return HttpMethodWrapper.GET;
+		}
+
+		@Override
+		default String getTargetRestEndpointURL() {
+			return "/test/select-version";
+		}
+
+		@Override
+		default Class<EmptyResponseBody> getResponseClass() {
+			return EmptyResponseBody.class;
+		}
+
+		@Override
+		default HttpResponseStatus getResponseStatusCode() {
+			return HttpResponseStatus.OK;
+		}
+
+		@Override
+		default String getDescription() {
+			return null;
+		}
+
+		@Override
+		default EmptyMessageParameters getUnresolvedMessageParameters() {
+			return EmptyMessageParameters.getInstance();
+		}
+	}
+
+	private enum TestVersionSelectionHeaders1 implements TestVersionSelectionHeadersBase {
+		INSTANCE;
+
+		@Override
+		public Collection<RestAPIVersion> getSupportedAPIVersions() {
+			return Collections.singleton(RestAPIVersion.V0);
+		}
+	}
+
+	private enum TestVersionSelectionHeaders2 implements TestVersionSelectionHeadersBase {
+		INSTANCE;
+
+		@Override
+		public Collection<RestAPIVersion> getSupportedAPIVersions() {
+			return Collections.singleton(RestAPIVersion.V1);
+		}
+	}
+
+	private static class TestVersionSelectionHandler1 extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+		private TestVersionSelectionHandler1(
+			final CompletableFuture<String> localRestAddress,
+			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			final Time timeout) {
+			super(localRestAddress, leaderRetriever, timeout, Collections.emptyMap(), TestVersionSelectionHeaders1.INSTANCE);
+		}
+
+		@Override
+		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+			throw new RestHandlerException("test failure 1", HttpResponseStatus.OK);
+		}
+	}
+
+	private static class TestVersionSelectionHandler2 extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+		private TestVersionSelectionHandler2(
+			final CompletableFuture<String> localRestAddress,
+			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			final Time timeout) {
+			super(localRestAddress, leaderRetriever, timeout, Collections.emptyMap(), TestVersionSelectionHeaders2.INSTANCE);
+		}
+
+		@Override
+		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+			throw new RestHandlerException("test failure 2", HttpResponseStatus.ACCEPTED);
+		}
+	}
+
 	private enum TestUploadHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 		INSTANCE;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java
new file mode 100644
index 00000000000..4f60da1716e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.versioning;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Tests for {@link RestAPIVersion}.
+ */
+public class RestAPIVersionTest extends TestLogger {
+	@Test
+	public void testGetLatest() {
+		Collection<RestAPIVersion> candidates = Arrays.asList(RestAPIVersion.V0, RestAPIVersion.V1);
+		Assert.assertEquals(RestAPIVersion.V1, RestAPIVersion.getLatestVersion(candidates));
+	}
+
+	@Test
+	public void testSingleDefaultVersion() {
+		final List<RestAPIVersion> defaultVersions = Arrays.stream(RestAPIVersion.values())
+			.filter(RestAPIVersion::isDefaultVersion)
+			.collect(Collectors.toList());
+
+		Assert.assertEquals(
+			"Only one RestAPIVersion should be marked as the default. Defaults: " + defaultVersions,
+			1,
+			defaultVersions.size());
+	}
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services