You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/09/03 08:33:53 UTC
[flink] 02/02: [FLINK-7551][rest] Add versioning
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 14ae6e5df54d0855e414270c5c107839d0ee1c76
Author: zentol <ch...@apache.org>
AuthorDate: Wed Aug 22 12:04:29 2018 +0200
[FLINK-7551][rest] Add versioning
---
...est_dispatcher.html => rest_v1_dispatcher.html} | 0
docs/monitoring/rest_api.md | 51 +++--
.../flink/docs/rest/RestAPIDocGenerator.java | 18 +-
.../HistoryServerStaticFileServerHandler.java | 2 +-
.../HistoryServerStaticFileServerHandlerTest.java | 8 +-
.../org/apache/flink/runtime/rest/RestClient.java | 32 ++-
.../flink/runtime/rest/RestServerEndpoint.java | 44 +++-
.../rest/handler/RestHandlerSpecification.java | 13 ++
.../runtime/rest/versioning/RestAPIVersion.java | 98 ++++++++
.../apache/flink/runtime/rest/RestClientTest.java | 28 ++-
.../runtime/rest/RestServerEndpointITCase.java | 251 +++++++++++++++++++++
.../rest/versioning/RestAPIVersionTest.java | 52 +++++
12 files changed, 561 insertions(+), 36 deletions(-)
diff --git a/docs/_includes/generated/rest_dispatcher.html b/docs/_includes/generated/rest_v1_dispatcher.html
similarity index 100%
rename from docs/_includes/generated/rest_dispatcher.html
rename to docs/_includes/generated/rest_v1_dispatcher.html
diff --git a/docs/monitoring/rest_api.md b/docs/monitoring/rest_api.md
index eefc8b9..85ec0ac 100644
--- a/docs/monitoring/rest_api.md
+++ b/docs/monitoring/rest_api.md
@@ -52,13 +52,26 @@ To add new requests, one needs to
A good example is the `org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler` that uses the `org.apache.flink.runtime.rest.messages.JobExceptionsHeaders`.
-## Available Requests
+## API
-### Dispatcher
+The REST API is versioned, with specific versions being queryable by prefixing the url with the version prefix. Prefixes are always of the form `v[version_number]`.
+For example, to access version 1 of `/foo/bar` one would query `/v1/foo/bar`.
-{% include generated/rest_dispatcher.html %}
+If no version is specified Flink will default to the *oldest* version supporting the request.
-## Legacy
+Querying unsupported/non-existing versions will return a 404 error.
+
+<span class="label label-danger">Attention</span> REST API versioning is *not* active if the cluster runs in [legacy mode](../ops/config.html#mode). For this case please refer to the legacy API below.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="v1" markdown="1">
+#### Dispatcher
+
+{% include generated/rest_v1_dispatcher.html %}
+</div>
+
+<div data-lang="legacy" markdown="1">
This section is only relevant if the cluster runs in [legacy mode](../ops/config.html#mode).
@@ -90,7 +103,7 @@ Values in angle brackets are variables, for example `http://hostname:8081/jobs/<
- `/jars/:jarid/run`
-### General
+#### General
**`/config`**
@@ -126,7 +139,7 @@ Sample Result:
}
{% endhighlight %}
-### Overview of Jobs
+#### Overview of Jobs
**`/jobs/overview`**
@@ -163,7 +176,7 @@ Sample Result:
}
{% endhighlight %}
-### Details of a Running or Completed Job
+#### Details of a Running or Completed Job
**`/jobs/<jobid>`**
@@ -573,15 +586,15 @@ Sample Result:
}
{% endhighlight %}
-### Job Cancellation
+#### Job Cancellation
-#### Cancel Job
+##### Cancel Job
`DELETE` request to **`/jobs/:jobid/cancel`**.
Triggers job cancellation, result on success is `{}`.
-#### Cancel Job with Savepoint
+##### Cancel Job with Savepoint
Triggers a savepoint and cancels the job after the savepoint succeeds.
@@ -601,7 +614,7 @@ Sample Trigger Result:
}
{% endhighlight %}
-##### Monitoring Progress
+###### Monitoring Progress
The progress of the cancellation has to be monitored by the user at
@@ -611,7 +624,7 @@ The progress of the cancellation has to be monitored by the user at
The request ID is returned by the trigger result.
-###### In-Progress
+####### In-Progress
{% highlight json %}
{
@@ -620,7 +633,7 @@ The request ID is returned by the trigger result.
}
{% endhighlight %}
-###### Success
+####### Success
{% highlight json %}
{
@@ -632,7 +645,7 @@ The request ID is returned by the trigger result.
The `savepointPath` points to the external path of the savepoint, which can be used to resume the savepoint.
-###### Failed
+####### Failed
{% highlight json %}
{
@@ -642,11 +655,11 @@ The `savepointPath` points to the external path of the savepoint, which can be u
}
{% endhighlight %}
-### Submitting Programs
+#### Submitting Programs
It is possible to upload, run, and list Flink programs via the REST APIs and web frontend.
-#### Upload a new JAR file
+##### Upload a new JAR file
Send a `POST` request to `/jars/upload` with your jar file sent as multi-part data under the `jarfile` file.
Also make sure that the multi-part data includes the `Content-Type` of the file itself, some http libraries do not add the header by default.
@@ -659,7 +672,7 @@ Content-Disposition: form-data; name="jarfile"; filename="YourFileName.jar"
Content-Type: application/x-java-archive
{% endhighlight %}
-#### Run a Program (POST)
+##### Run a Program (POST)
Send a `POST` request to `/jars/:jarid/run`. The `jarid` parameter is the file name of the program JAR in the configured web frontend upload directory (configuration key `web.upload.dir`).
@@ -688,3 +701,7 @@ Response:
{% endhighlight %}
{% top %}
+</div>
+
+</div>
+
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 82fdeec..4df1d6e 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
@@ -123,13 +124,24 @@ public class RestAPIDocGenerator {
public static void main(String[] args) throws IOException {
String outputDirectory = args[0];
- createHtmlFile(new DocumentingDispatcherRestEndpoint(), Paths.get(outputDirectory, "rest_dispatcher.html"));
+ for (final RestAPIVersion apiVersion : RestAPIVersion.values()) {
+ if (apiVersion == RestAPIVersion.V0) {
+ // this version exists only for testing purposes
+ continue;
+ }
+ createHtmlFile(
+ new DocumentingDispatcherRestEndpoint(),
+ apiVersion,
+ Paths.get(outputDirectory, "rest_" + apiVersion.getURLVersionPrefix() + "_dispatcher.html"));
+ }
}
- private static void createHtmlFile(DocumentingRestEndpoint restEndpoint, Path outputFile) throws IOException {
+ private static void createHtmlFile(DocumentingRestEndpoint restEndpoint, RestAPIVersion apiVersion, Path outputFile) throws IOException {
StringBuilder html = new StringBuilder();
- List<MessageHeaders> specs = restEndpoint.getSpecs();
+ List<MessageHeaders> specs = restEndpoint.getSpecs().stream()
+ .filter(spec -> spec.getSupportedAPIVersions().contains(apiVersion))
+ .collect(Collectors.toList());
specs.forEach(spec -> html.append(createHtmlEntry(spec)));
Files.deleteIfExists(outputFile);
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
index 2042088..d8542d6 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
@@ -164,7 +164,7 @@ public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHa
HandlerUtils.sendErrorResponse(
ctx,
request,
- new ErrorResponseBody(String.format("Unable to load requested file %s.", requestPath)),
+ new ErrorResponseBody("File not found."),
NOT_FOUND,
Collections.emptyMap());
return;
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
index b08504d..19a3d52 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
@@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
+import static org.hamcrest.CoreMatchers.containsString;
+
/**
* Tests for the HistoryServerStaticFileServerHandler.
*/
@@ -56,7 +58,7 @@ public class HistoryServerStaticFileServerHandlerTest {
try {
// verify that 404 message is returned when requesting a non-existent file
String notFound404 = HistoryServerTest.getFromHTTP("http://localhost:" + port + "/hello");
- Assert.assertTrue(notFound404.contains("404 Not Found"));
+ Assert.assertThat(notFound404, containsString("not found"));
// verify that a) a file can be loaded using the ClassLoader and b) that the HistoryServer
// index_hs.html is injected
@@ -71,12 +73,12 @@ public class HistoryServerStaticFileServerHandlerTest {
File dir = new File(webDir, "dir.json");
dir.mkdirs();
String dirNotFound404 = HistoryServerTest.getFromHTTP("http://localhost:" + port + "/dir");
- Assert.assertTrue(dirNotFound404.contains("404 Not Found"));
+ Assert.assertTrue(dirNotFound404.contains("not found"));
// verify that a 404 message is returned when requesting a file outside the webDir
tmp.newFile("secret");
String x = HistoryServerTest.getFromHTTP("http://localhost:" + port + "/../secret");
- Assert.assertTrue(x.contains("404 Not Found"));
+ Assert.assertTrue(x.contains("not found"));
} finally {
webUI.shutdown();
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 2e9de4c..a855749 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rest.util.RestConstants;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
@@ -85,6 +86,7 @@ import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE;
@@ -173,6 +175,24 @@ public class RestClient {
U messageParameters,
R request,
Collection<FileUpload> fileUploads) throws IOException {
+ return sendRequest(
+ targetAddress,
+ targetPort,
+ messageHeaders,
+ messageParameters,
+ request,
+ fileUploads,
+ RestAPIVersion.getLatestVersion(messageHeaders.getSupportedAPIVersions()));
+ }
+
+ public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
+ String targetAddress,
+ int targetPort,
+ M messageHeaders,
+ U messageParameters,
+ R request,
+ Collection<FileUpload> fileUploads,
+ RestAPIVersion apiVersion) throws IOException {
Preconditions.checkNotNull(targetAddress);
Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536].");
Preconditions.checkNotNull(messageHeaders);
@@ -181,7 +201,17 @@ public class RestClient {
Preconditions.checkNotNull(fileUploads);
Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved.");
- String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
+ if (!messageHeaders.getSupportedAPIVersions().contains(apiVersion)) {
+ throw new IllegalArgumentException(String.format(
+ "The requested version %s is not supported by the request (method=%s URL=%s). Supported versions are: %s.",
+ apiVersion,
+ messageHeaders.getHttpMethod(),
+ messageHeaders.getTargetRestEndpointURL(),
+ messageHeaders.getSupportedAPIVersions().stream().map(RestAPIVersion::getURLVersionPrefix).collect(Collectors.joining(","))));
+ }
+
+ String versionedHandlerURL = "/" + apiVersion.getURLVersionPrefix() + messageHeaders.getTargetRestEndpointURL();
+ String targetUrl = MessageParameters.resolveUrl(versionedHandlerURL, messageParameters);
LOG.debug("Sending request of class {} to {}:{}{}", request.getClass(), targetAddress, targetPort, targetUrl);
// serialize payload
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index e836e35..28af072 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.rest.handler.router.RouterHandler;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.Preconditions;
@@ -144,8 +145,7 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
RestHandlerUrlComparator.INSTANCE);
handlers.forEach(handler -> {
- log.debug("Register handler {} under {}@{}.", handler.f1, handler.f0.getHttpMethod(), handler.f0.getTargetRestEndpointURL());
- registerHandler(router, handler);
+ registerHandler(router, handler, log);
});
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@@ -364,22 +364,37 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
}
}
- private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler) {
- switch (specificationHandler.f0.getHttpMethod()) {
+ private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler, Logger log) {
+ final String handlerURL = specificationHandler.f0.getTargetRestEndpointURL();
+ // setup versioned urls
+ for (final RestAPIVersion supportedVersion : specificationHandler.f0.getSupportedAPIVersions()) {
+ final String versionedHandlerURL = '/' + supportedVersion.getURLVersionPrefix() + handlerURL;
+ log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), versionedHandlerURL);
+ registerHandler(router, versionedHandlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1);
+ if (supportedVersion.isDefaultVersion()) {
+ // setup unversioned url for convenience and backwards compatibility
+ log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), handlerURL);
+ registerHandler(router, handlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1);
+ }
+ }
+ }
+
+ private static void registerHandler(Router router, String handlerURL, HttpMethodWrapper httpMethod, ChannelInboundHandler handler) {
+ switch (httpMethod) {
case GET:
- router.addGet(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+ router.addGet(handlerURL, handler);
break;
case POST:
- router.addPost(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+ router.addPost(handlerURL, handler);
break;
case DELETE:
- router.addDelete(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+ router.addDelete(handlerURL, handler);
break;
case PATCH:
- router.addPatch(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+ router.addPatch(handlerURL, handler);
break;
default:
- throw new RuntimeException("Unsupported http method: " + specificationHandler.f0.getHttpMethod() + '.');
+ throw new RuntimeException("Unsupported http method: " + httpMethod + '.');
}
}
@@ -437,13 +452,22 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
private static final Comparator<String> CASE_INSENSITIVE_ORDER = new CaseInsensitiveOrderComparator();
+ private static final Comparator<RestAPIVersion> API_VERSION_ORDER = new RestAPIVersion.RestAPIVersionComparator();
+
static final RestHandlerUrlComparator INSTANCE = new RestHandlerUrlComparator();
@Override
public int compare(
Tuple2<RestHandlerSpecification, ChannelInboundHandler> o1,
Tuple2<RestHandlerSpecification, ChannelInboundHandler> o2) {
- return CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), o2.f0.getTargetRestEndpointURL());
+ final int urlComparisonResult = CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), o2.f0.getTargetRestEndpointURL());
+ if (urlComparisonResult != 0) {
+ return urlComparisonResult;
+ } else {
+ return API_VERSION_ORDER.compare(
+ Collections.min(o1.f0.getSupportedAPIVersions()),
+ Collections.min(o2.f0.getSupportedAPIVersions()));
+ }
}
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
index 4ebcd49..6561679 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
@@ -19,6 +19,10 @@
package org.apache.flink.runtime.rest.handler;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+
+import java.util.Collection;
+import java.util.Collections;
/**
* Rest handler interface which all rest handler implementation have to implement.
@@ -38,4 +42,13 @@ public interface RestHandlerSpecification {
* @return endpoint url that this request should be sent to
*/
String getTargetRestEndpointURL();
+
+ /**
+ * Returns the supported API versions that this request supports.
+ *
+ * @return Collection of supported API versions
+ */
+ default Collection<RestAPIVersion> getSupportedAPIVersions() {
+ return Collections.singleton(RestAPIVersion.V1);
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java
new file mode 100644
index 0000000..d630563
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.versioning;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+
+/**
+ * An enum for all versions of the REST API.
+ *
+ * <p>REST API versions are global and thus apply to every REST component.
+ *
+ * <p>Changes that must result in an API version increment include but are not limited to:
+ * - modification of a handler url
+ * - addition of new mandatory parameters
+ * - removal of a handler/request
+ * - modifications to request/response bodies (excluding additions)
+ */
+public enum RestAPIVersion {
+ V0(0, false), // strictly for testing purposes
+ V1(1, true);
+
+ private final int versionNumber;
+
+ private final boolean isDefaultVersion;
+
+ RestAPIVersion(int versionNumber, boolean isDefaultVersion) {
+ this.versionNumber = versionNumber;
+ this.isDefaultVersion = isDefaultVersion;
+ }
+
+ /**
+ * Returns the URL version prefix (e.g. "v1") for this version.
+ *
+ * @return URL version prefix
+ */
+ public String getURLVersionPrefix() {
+ return name().toLowerCase();
+ }
+
+ /**
+ * Returns whether this version is the default REST API version.
+ *
+ * @return whether this version is the default
+ */
+ public boolean isDefaultVersion() {
+ return isDefaultVersion;
+ }
+
+ /**
+ * Converts the given URL version prefix (e.g "v1") to a {@link RestAPIVersion}.
+ *
+ * @param prefix prefix to converted
+ * @return REST API version matching the prefix
+ * @throws IllegalArgumentException if the prefix doesn't match any version
+ */
+ public static RestAPIVersion fromURLVersionPrefix(String prefix) {
+ return valueOf(prefix.toUpperCase());
+ }
+
+ /**
+ * Returns the latest version from the given collection.
+ *
+ * @param versions possible candidates
+ * @return latest version
+ */
+ public static RestAPIVersion getLatestVersion(Collection<RestAPIVersion> versions) {
+ return Collections.max(versions, new RestAPIVersionComparator());
+ }
+
+ /**
+ * Comparator for {@link RestAPIVersion} that sorts versions based on their version number, i.e. oldest to latest.
+ */
+ public static class RestAPIVersionComparator implements Comparator<RestAPIVersion> {
+
+ @Override
+ public int compare(RestAPIVersion o1, RestAPIVersion o2) {
+ return Integer.compare(o1.versionNumber, o2.versionNumber);
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index 209f2d1..22cd6f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -25,14 +25,18 @@ import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.junit.Assert;
import org.junit.Test;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -45,12 +49,13 @@ import static org.junit.Assert.assertThat;
*/
public class RestClientTest extends TestLogger {
+ private static final String unroutableIp = "10.255.255.1";
+
@Test
public void testConnectionTimeout() throws Exception {
final Configuration config = new Configuration();
config.setLong(RestOptions.CONNECTION_TIMEOUT, 1);
final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), Executors.directExecutor());
- final String unroutableIp = "10.255.255.1";
try {
restClient.sendRequest(
unroutableIp,
@@ -66,6 +71,27 @@ public class RestClientTest extends TestLogger {
}
}
+ @Test
+ public void testInvalidVersionRejection() throws Exception {
+ final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), Executors.directExecutor());
+
+ try {
+ CompletableFuture<EmptyResponseBody> invalidVersionResponse = restClient.sendRequest(
+ unroutableIp,
+ 80,
+ new TestMessageHeaders(),
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance(),
+ Collections.emptyList(),
+ RestAPIVersion.V0
+ );
+ Assert.fail("The request should have been rejected due to a version mismatch.");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ }
+
private static class TestMessageHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 31f78e3..b017610 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -56,8 +57,12 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
import org.apache.commons.io.IOUtils;
import org.junit.After;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -85,6 +90,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
@@ -184,6 +190,21 @@ public class RestServerEndpointITCase extends TestLogger {
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT);
+ TestVersionHandler testVersionHandler = new TestVersionHandler(
+ CompletableFuture.completedFuture(restAddress),
+ mockGatewayRetriever,
+ RpcUtils.INF_TIMEOUT);
+
+ TestVersionSelectionHandler1 testVersionSelectionHandler1 = new TestVersionSelectionHandler1(
+ CompletableFuture.completedFuture(restAddress),
+ mockGatewayRetriever,
+ RpcUtils.INF_TIMEOUT);
+
+ TestVersionSelectionHandler2 testVersionSelectionHandler2 = new TestVersionSelectionHandler2(
+ CompletableFuture.completedFuture(restAddress),
+ mockGatewayRetriever,
+ RpcUtils.INF_TIMEOUT);
+
testUploadHandler = new TestUploadHandler(
CompletableFuture.completedFuture(restAddress),
mockGatewayRetriever,
@@ -198,6 +219,9 @@ public class RestServerEndpointITCase extends TestLogger {
final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList(
Tuple2.of(new TestHeaders(), testHandler),
Tuple2.of(TestUploadHeaders.INSTANCE, testUploadHandler),
+ Tuple2.of(testVersionHandler.getMessageHeaders(), testVersionHandler),
+ Tuple2.of(testVersionSelectionHandler1.getMessageHeaders(), testVersionSelectionHandler1),
+ Tuple2.of(testVersionSelectionHandler2.getMessageHeaders(), testVersionSelectionHandler2),
Tuple2.of(WebContentHandlerSpecification.getInstance(), staticFileServerHandler));
serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers);
@@ -415,6 +439,88 @@ public class RestServerEndpointITCase extends TestLogger {
assertEquals("foobar", fileContents.trim());
}
+ @Test
+ public void testVersioning() throws Exception {
+ CompletableFuture<EmptyResponseBody> unspecifiedVersionResponse = restClient.sendRequest(
+ serverAddress.getHostName(),
+ serverAddress.getPort(),
+ TestVersionHeaders.INSTANCE,
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance(),
+ Collections.emptyList()
+ );
+
+ unspecifiedVersionResponse.get(5, TimeUnit.SECONDS);
+
+ CompletableFuture<EmptyResponseBody> specifiedVersionResponse = restClient.sendRequest(
+ serverAddress.getHostName(),
+ serverAddress.getPort(),
+ TestVersionHeaders.INSTANCE,
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance(),
+ Collections.emptyList(),
+ RestAPIVersion.V1
+ );
+
+ specifiedVersionResponse.get(5, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testVersionSelection() throws Exception {
+ CompletableFuture<EmptyResponseBody> version1Response = restClient.sendRequest(
+ serverAddress.getHostName(),
+ serverAddress.getPort(),
+ TestVersionSelectionHeaders1.INSTANCE,
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance(),
+ Collections.emptyList(),
+ RestAPIVersion.V0
+ );
+
+ try {
+ version1Response.get(5, TimeUnit.SECONDS);
+ fail();
+ } catch (ExecutionException ee) {
+ RestClientException rce = (RestClientException) ee.getCause();
+ assertEquals(HttpResponseStatus.OK, rce.getHttpResponseStatus());
+ }
+
+ CompletableFuture<EmptyResponseBody> version2Response = restClient.sendRequest(
+ serverAddress.getHostName(),
+ serverAddress.getPort(),
+ TestVersionSelectionHeaders2.INSTANCE,
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance(),
+ Collections.emptyList(),
+ RestAPIVersion.V1
+ );
+
+ try {
+ version2Response.get(5, TimeUnit.SECONDS);
+ fail();
+ } catch (ExecutionException ee) {
+ RestClientException rce = (RestClientException) ee.getCause();
+ assertEquals(HttpResponseStatus.ACCEPTED, rce.getHttpResponseStatus());
+ }
+ }
+
+ @Test
+ public void testDefaultVersionRouting() throws Exception {
+ Assume.assumeFalse(
+ "Ignoring SSL-enabled test to keep OkHttp usage simple.",
+ config.getBoolean(SecurityOptions.SSL_REST_ENABLED));
+
+ OkHttpClient client = new OkHttpClient();
+
+ final Request request = new Request.Builder()
+ .url(serverEndpoint.getRestBaseUrl() + TestVersionSelectionHeaders2.INSTANCE.getTargetRestEndpointURL())
+ .build();
+
+ try (final Response response = client.newCall(request).execute()) {
+ assertEquals(HttpResponseStatus.ACCEPTED.code(), response.code());
+ }
+ }
+
private HttpURLConnection openHttpConnectionForUpload(final String boundary) throws IOException {
final HttpURLConnection connection =
(HttpURLConnection) new URL(serverEndpoint.getRestBaseUrl() + "/upload").openConnection();
@@ -697,6 +803,151 @@ public class RestServerEndpointITCase extends TestLogger {
}
}
+ private static class TestVersionHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+ private TestVersionHandler(
+ final CompletableFuture<String> localRestAddress,
+ final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ final Time timeout) {
+ super(localRestAddress, leaderRetriever, timeout, Collections.emptyMap(), TestVersionHeaders.INSTANCE);
+ }
+
+ @Override
+ protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+ return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+ }
+ }
+
+ private enum TestVersionHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+ INSTANCE;
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return "/test/versioning";
+ }
+
+ @Override
+ public Class<EmptyResponseBody> getResponseClass() {
+ return EmptyResponseBody.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public String getDescription() {
+ return null;
+ }
+
+ @Override
+ public EmptyMessageParameters getUnresolvedMessageParameters() {
+ return EmptyMessageParameters.getInstance();
+ }
+
+ @Override
+ public Collection<RestAPIVersion> getSupportedAPIVersions() {
+ return Collections.singleton(RestAPIVersion.V1);
+ }
+ }
+
+ private interface TestVersionSelectionHeadersBase extends MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+ @Override
+ default Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ default HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ default String getTargetRestEndpointURL() {
+ return "/test/select-version";
+ }
+
+ @Override
+ default Class<EmptyResponseBody> getResponseClass() {
+ return EmptyResponseBody.class;
+ }
+
+ @Override
+ default HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ default String getDescription() {
+ return null;
+ }
+
+ @Override
+ default EmptyMessageParameters getUnresolvedMessageParameters() {
+ return EmptyMessageParameters.getInstance();
+ }
+ }
+
+ private enum TestVersionSelectionHeaders1 implements TestVersionSelectionHeadersBase {
+ INSTANCE;
+
+ @Override
+ public Collection<RestAPIVersion> getSupportedAPIVersions() {
+ return Collections.singleton(RestAPIVersion.V0);
+ }
+ }
+
+ private enum TestVersionSelectionHeaders2 implements TestVersionSelectionHeadersBase {
+ INSTANCE;
+
+ @Override
+ public Collection<RestAPIVersion> getSupportedAPIVersions() {
+ return Collections.singleton(RestAPIVersion.V1);
+ }
+ }
+
+ private static class TestVersionSelectionHandler1 extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+ private TestVersionSelectionHandler1(
+ final CompletableFuture<String> localRestAddress,
+ final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ final Time timeout) {
+ super(localRestAddress, leaderRetriever, timeout, Collections.emptyMap(), TestVersionSelectionHeaders1.INSTANCE);
+ }
+
+ @Override
+ protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+ throw new RestHandlerException("test failure 1", HttpResponseStatus.OK);
+ }
+ }
+
+ private static class TestVersionSelectionHandler2 extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+ private TestVersionSelectionHandler2(
+ final CompletableFuture<String> localRestAddress,
+ final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ final Time timeout) {
+ super(localRestAddress, leaderRetriever, timeout, Collections.emptyMap(), TestVersionSelectionHeaders2.INSTANCE);
+ }
+
+ @Override
+ protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+ throw new RestHandlerException("test failure 2", HttpResponseStatus.ACCEPTED);
+ }
+ }
+
private enum TestUploadHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
INSTANCE;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java
new file mode 100644
index 0000000..4f60da1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.versioning;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Tests for {@link RestAPIVersion}.
+ */
+public class RestAPIVersionTest extends TestLogger {
+ @Test
+ public void testGetLatest() {
+ Collection<RestAPIVersion> candidates = Arrays.asList(RestAPIVersion.V0, RestAPIVersion.V1);
+ Assert.assertEquals(RestAPIVersion.V1, RestAPIVersion.getLatestVersion(candidates));
+ }
+
+ @Test
+ public void testSingleDefaultVersion() {
+ final List<RestAPIVersion> defaultVersions = Arrays.stream(RestAPIVersion.values())
+ .filter(RestAPIVersion::isDefaultVersion)
+ .collect(Collectors.toList());
+
+ Assert.assertEquals(
+ "Only one RestAPIVersion should be marked as the default. Defaults: " + defaultVersions,
+ 1,
+ defaultVersions.size());
+ }
+}