You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/26 09:11:33 UTC
[1/3] flink git commit: [FLINK-7040] [rest] Add basics for REST
communication
Repository: flink
Updated Branches:
refs/heads/master c384e52e6 -> bafddd798
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
new file mode 100644
index 0000000..e2ccfb5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * IT cases for {@link RestClientEndpoint} and {@link RestServerEndpoint}.
+ */
+public class RestEndpointITCase extends TestLogger {
+
+ private static final JobID PATH_JOB_ID = new JobID();
+ private static final JobID QUERY_JOB_ID = new JobID();
+ private static final String JOB_ID_KEY = "jobid";
+
+ @Test
+ public void testEndpoints() throws ConfigurationException, IOException, InterruptedException, ExecutionException {
+ Configuration config = new Configuration();
+
+ RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
+ RestClientEndpointConfiguration clientConfig = RestClientEndpointConfiguration.fromConfiguration(config);
+
+ RestServerEndpoint serverEndpoint = new TestRestServerEndpoint(serverConfig);
+ RestClientEndpoint clientEndpoint = new TestRestClientEndpoint(clientConfig);
+
+ try {
+ serverEndpoint.start();
+
+ TestParameters parameters = new TestParameters();
+ parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
+ parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
+
+ // send first request and wait until the handler blocks
+ CompletableFuture<TestResponse> response1;
+ synchronized (TestHandler.LOCK) {
+ response1 = clientEndpoint.sendRequest(new TestHeaders(), parameters, new TestRequest(1));
+ TestHandler.LOCK.wait();
+ }
+
+ // send second request and verify response
+ CompletableFuture<TestResponse> response2 = clientEndpoint.sendRequest(new TestHeaders(), parameters, new TestRequest(2));
+ Assert.assertEquals(2, response2.get().id);
+
+ // wake up blocked handler
+ synchronized (TestHandler.LOCK) {
+ TestHandler.LOCK.notifyAll();
+ }
+ // verify response to first request
+ Assert.assertEquals(1, response1.get().id);
+ } finally {
+ clientEndpoint.shutdown();
+ serverEndpoint.shutdown();
+ }
+ }
+
+ private static class TestRestServerEndpoint extends RestServerEndpoint {
+
+ TestRestServerEndpoint(RestServerEndpointConfiguration configuration) {
+ super(configuration);
+ }
+
+ @Override
+ protected Collection<AbstractRestHandler<?, ?, ?>> initializeHandlers() {
+ return Collections.singleton(new TestHandler());
+ }
+ }
+
+ private static class TestHandler extends AbstractRestHandler<TestRequest, TestResponse, TestParameters> {
+
+ public static final Object LOCK = new Object();
+
+ TestHandler() {
+ super(new TestHeaders());
+ }
+
+ @Override
+ protected CompletableFuture<TestResponse> handleRequest(@Nonnull HandlerRequest<TestRequest, TestParameters> request) throws RestHandlerException {
+ if (request.getPathParameter(JobIDPathParameter.class) == null) {
+ throw new RestHandlerException("Path parameter was missing.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ } else {
+ Assert.assertEquals(request.getPathParameter(JobIDPathParameter.class).getValue(), PATH_JOB_ID);
+ }
+ if (request.getQueryParameter(JobIDQueryParameter.class) == null) {
+ throw new RestHandlerException("Query parameter was missing.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ } else {
+ Assert.assertEquals(request.getQueryParameter(JobIDQueryParameter.class).getValue().get(0), QUERY_JOB_ID);
+ }
+
+ if (request.getRequestBody().id == 1) {
+ synchronized (LOCK) {
+ try {
+ LOCK.notifyAll();
+ LOCK.wait();
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+ return CompletableFuture.completedFuture(new TestResponse(request.getRequestBody().id));
+ }
+ }
+
+ private static class TestRestClientEndpoint extends RestClientEndpoint {
+
+ TestRestClientEndpoint(RestClientEndpointConfiguration configuration) {
+ super(configuration);
+ }
+ }
+
+ private static class TestRequest implements RequestBody {
+ public final int id;
+
+ @JsonCreator
+ public TestRequest(@JsonProperty("id") int id) {
+ this.id = id;
+ }
+ }
+
+ private static class TestResponse implements ResponseBody {
+ public final int id;
+
+ @JsonCreator
+ public TestResponse(@JsonProperty("id") int id) {
+ this.id = id;
+ }
+ }
+
+ private static class TestHeaders implements MessageHeaders<TestRequest, TestResponse, TestParameters> {
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.POST;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return "/test/:jobid";
+ }
+
+ @Override
+ public Class<TestRequest> getRequestClass() {
+ return TestRequest.class;
+ }
+
+ @Override
+ public Class<TestResponse> getResponseClass() {
+ return TestResponse.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public TestParameters getUnresolvedMessageParameters() {
+ return new TestParameters();
+ }
+ }
+
+ private static class TestParameters extends MessageParameters {
+ private final JobIDPathParameter jobIDPathParameter = new JobIDPathParameter();
+ private final JobIDQueryParameter jobIDQueryParameter = new JobIDQueryParameter();
+
+ @Override
+ public Collection<MessagePathParameter> getPathParameters() {
+ return Collections.singleton(jobIDPathParameter);
+ }
+
+ @Override
+ public Collection<MessageQueryParameter> getQueryParameters() {
+ return Collections.singleton(jobIDQueryParameter);
+ }
+ }
+
+ static class JobIDPathParameter extends MessagePathParameter<JobID> {
+ JobIDPathParameter() {
+ super(JOB_ID_KEY, MessageParameterRequisiteness.MANDATORY);
+ }
+
+ @Override
+ public JobID convertFromString(String value) {
+ return JobID.fromHexString(value);
+ }
+
+ @Override
+ protected String convertToString(JobID value) {
+ return value.toString();
+ }
+ }
+
+ static class JobIDQueryParameter extends MessageQueryParameter<JobID> {
+ JobIDQueryParameter() {
+ super(JOB_ID_KEY, MessageParameterRequisiteness.MANDATORY);
+ }
+
+ @Override
+ public JobID convertValueFromString(String value) {
+ return JobID.fromHexString(value);
+ }
+
+ @Override
+ public String convertStringToValue(JobID value) {
+ return value.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
new file mode 100644
index 0000000..a5cfbf1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.api.common.JobID;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Tests for {@link MessageParameters}.
+ */
+public class MessageParametersTest {
+ @Test
+ public void testResolveUrl() {
+ String genericUrl = "/jobs/:jobid/state";
+ TestMessageParameters parameters = new TestMessageParameters();
+ JobID pathJobID = new JobID();
+ JobID queryJobID = new JobID();
+ parameters.pathParameter.resolve(pathJobID);
+ parameters.queryParameter.resolve(Collections.singletonList(queryJobID));
+
+ String resolvedUrl = MessageParameters.resolveUrl(genericUrl, parameters);
+
+ Assert.assertEquals("/jobs/" + pathJobID + "/state?jobid=" + queryJobID, resolvedUrl);
+ }
+
+ private static class TestMessageParameters extends MessageParameters {
+ private final TestPathParameter pathParameter = new TestPathParameter();
+ private final TestQueryParameter queryParameter = new TestQueryParameter();
+
+ @Override
+ public Collection<MessagePathParameter> getPathParameters() {
+ return Collections.singleton(pathParameter);
+ }
+
+ @Override
+ public Collection<MessageQueryParameter> getQueryParameters() {
+ return Collections.singleton(queryParameter);
+ }
+ }
+
+ private static class TestPathParameter extends MessagePathParameter<JobID> {
+
+ TestPathParameter() {
+ super("jobid", MessageParameterRequisiteness.MANDATORY);
+ }
+
+ @Override
+ public JobID convertFromString(String value) {
+ return JobID.fromHexString(value);
+ }
+
+ @Override
+ protected String convertToString(JobID value) {
+ return value.toString();
+ }
+ }
+
+ private static class TestQueryParameter extends MessageQueryParameter<JobID> {
+
+ TestQueryParameter() {
+ super("jobid", MessageParameterRequisiteness.MANDATORY);
+ }
+
+ @Override
+ public JobID convertValueFromString(String value) {
+ return JobID.fromHexString(value);
+ }
+
+ @Override
+ public String convertStringToValue(JobID value) {
+ return value.toString();
+ }
+ }
+}
[2/3] flink git commit: [FLINK-7040] [rest] Add basics for REST
communication
Posted by tr...@apache.org.
[FLINK-7040] [rest] Add basics for REST communication
Add better error message for get requests with a body
Consistent error message for 404
Rework resolve URL generation
Rework handler registration
Support concurrent requests
Rework client response receival
Rework handler response (remove HandlerResponse class)
tests: move client/server shutdown into finally block
Close connection in ClientHandler
Proper shutdown of netty stack
simplify RestClientEndpoint lambda chain
Provide handlers with access to MessageParameters
This closes #4569.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c019787e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c019787e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c019787e
Branch: refs/heads/master
Commit: c019787e5f6873b44607f55ba7a5bd876cb41421
Parents: c384e52
Author: zentol <ch...@apache.org>
Authored: Wed Aug 16 15:17:45 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Aug 26 11:10:59 2017 +0200
----------------------------------------------------------------------
.../apache/flink/configuration/RestOptions.java | 43 ++++
.../flink/runtime/rest/HttpMethodWrapper.java | 39 +++
.../flink/runtime/rest/RestClientEndpoint.java | 235 +++++++++++++++++
.../rest/RestClientEndpointConfiguration.java | 111 +++++++++
.../flink/runtime/rest/RestServerEndpoint.java | 192 ++++++++++++++
.../rest/RestServerEndpointConfiguration.java | 107 ++++++++
.../rest/handler/AbstractRestHandler.java | 217 ++++++++++++++++
.../runtime/rest/handler/HandlerRequest.java | 104 ++++++++
.../rest/handler/PipelineErrorHandler.java | 55 ++++
.../rest/handler/RestHandlerException.java | 42 ++++
.../runtime/rest/handler/RouterHandler.java | 47 ++++
.../rest/messages/ErrorResponseBody.java | 47 ++++
.../runtime/rest/messages/MessageHeaders.java | 78 ++++++
.../runtime/rest/messages/MessageParameter.java | 140 +++++++++++
.../rest/messages/MessageParameters.java | 97 ++++++++
.../rest/messages/MessagePathParameter.java | 29 +++
.../rest/messages/MessageQueryParameter.java | 78 ++++++
.../runtime/rest/messages/RequestBody.java | 32 +++
.../runtime/rest/messages/ResponseBody.java | 32 +++
.../runtime/rest/util/RestClientException.java | 38 +++
.../runtime/rest/util/RestMapperUtils.java | 50 ++++
.../flink/runtime/rest/RestEndpointITCase.java | 249 +++++++++++++++++++
.../rest/messages/MessageParametersTest.java | 95 +++++++
23 files changed, 2157 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
new file mode 100644
index 0000000..a2a2013
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.Internal;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration parameters for REST communication.
+ */
+@Internal
+public class RestOptions {
+ /**
+ * The address that the server binds itself to / the client connects to.
+ */
+ public static final ConfigOption<String> REST_ADDRESS =
+ key("rest.address")
+ .defaultValue("localhost");
+
+ /**
+ * The port that the server listens on / the client connects to.
+ */
+ public static final ConfigOption<Integer> REST_PORT =
+ key("rest.port")
+ .defaultValue(9067);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpMethodWrapper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpMethodWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpMethodWrapper.java
new file mode 100644
index 0000000..8987d75
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpMethodWrapper.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+
+/**
+ * This class wraps netty's {@link HttpMethod}s into an enum, allowing us to use them in switches.
+ */
+public enum HttpMethodWrapper {
+ GET(HttpMethod.GET),
+ POST(HttpMethod.POST);
+
+ private HttpMethod nettyHttpMethod;
+
+ HttpMethodWrapper(HttpMethod nettyHttpMethod) {
+ this.nettyHttpMethod = nettyHttpMethod;
+ }
+
+ public HttpMethod getNettyHttpMethod() {
+ return nettyHttpMethod;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
new file mode 100644
index 0000000..61e1d7b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This client is the counter-part to the {@link RestServerEndpoint}.
+ */
+public class RestClientEndpoint {
+ private static final Logger LOG = LoggerFactory.getLogger(RestClientEndpoint.class);
+
+ private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+
+ private final String configuredTargetAddress;
+ private final int configuredTargetPort;
+
+ private Bootstrap bootstrap;
+
+ public RestClientEndpoint(RestClientEndpointConfiguration configuration) {
+ Preconditions.checkNotNull(configuration);
+ this.configuredTargetAddress = configuration.getTargetRestEndpointAddress();
+ this.configuredTargetPort = configuration.getTargetRestEndpointPort();
+
+ SSLEngine sslEngine = configuration.getSslEngine();
+ ChannelInitializer initializer = new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ // SSL should be the first handler in the pipeline
+ if (sslEngine != null) {
+ ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
+ }
+
+ ch.pipeline()
+ .addLast(new HttpClientCodec())
+ .addLast(new HttpObjectAggregator(1024 * 1024))
+ .addLast(new ClientHandler())
+ .addLast(new PipelineErrorHandler(LOG));
+ }
+ };
+ NioEventLoopGroup group = new NioEventLoopGroup(1);
+
+ bootstrap = new Bootstrap();
+ bootstrap
+ .group(group)
+ .channel(NioSocketChannel.class)
+ .handler(initializer);
+
+ LOG.info("Rest client endpoint started.");
+ }
+
+ public void shutdown() {
+ LOG.info("Shutting down rest endpoint.");
+ CompletableFuture<?> groupFuture = new CompletableFuture<>();
+ if (bootstrap != null) {
+ if (bootstrap.group() != null) {
+ bootstrap.group().shutdownGracefully(0, 5, TimeUnit.SECONDS)
+ .addListener(ignored -> groupFuture.complete(null));
+ }
+ }
+
+ try {
+ groupFuture.get(5, TimeUnit.SECONDS);
+ LOG.info("Rest endpoint shutdown complete.");
+ } catch (Exception e) {
+ LOG.warn("Rest endpoint shutdown failed.", e);
+ }
+ }
+
+ public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters, R request) throws IOException {
+ Preconditions.checkNotNull(messageHeaders);
+ Preconditions.checkNotNull(request);
+ Preconditions.checkNotNull(messageParameters);
+ Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved.");
+
+ String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
+
+ LOG.debug("Sending request of class {} to {}", request.getClass(), targetUrl);
+ // serialize payload
+ StringWriter sw = new StringWriter();
+ objectMapper.writeValue(sw, request);
+ ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+ // create request and set headers
+ FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
+ httpRequest.headers()
+ .add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity())
+ .add(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name())
+ .set(HttpHeaders.Names.HOST, configuredTargetAddress + ":" + configuredTargetPort)
+ .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+
+ return submitRequest(httpRequest, messageHeaders.getResponseClass());
+ }
+
+ private <P extends ResponseBody> CompletableFuture<P> submitRequest(FullHttpRequest httpRequest, Class<P> responseClass) {
+ return CompletableFuture.supplyAsync(() -> bootstrap.connect(configuredTargetAddress, configuredTargetPort))
+ .thenApply((channel) -> {
+ try {
+ return channel.sync();
+ } catch (InterruptedException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ })
+ .thenApply((ChannelFuture::channel))
+ .thenCompose(channel -> {
+ ClientHandler handler = channel.pipeline().get(ClientHandler.class);
+ CompletableFuture<JsonNode> future = handler.getJsonFuture();
+ channel.writeAndFlush(httpRequest);
+ return future.thenCompose(rawResponse -> parseResponse(rawResponse, responseClass));
+ });
+ }
+
+ private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonNode rawResponse, Class<P> responseClass) {
+ CompletableFuture<P> responseFuture = new CompletableFuture<>();
+ try {
+ P response = objectMapper.treeToValue(rawResponse, responseClass);
+ responseFuture.complete(response);
+ } catch (JsonProcessingException jpe) {
+ // the received response did not matched the expected response type
+
+ // lets see if it is an ErrorResponse instead
+ try {
+ ErrorResponseBody error = objectMapper.treeToValue(rawResponse, ErrorResponseBody.class);
+ responseFuture.completeExceptionally(new RestClientException(error.errors.toString()));
+ } catch (JsonProcessingException jpe2) {
+ // if this fails it is either the expected type or response type was wrong, most likely caused
+ // by a client/search MessageHeaders mismatch
+ LOG.error("Received response was neither of the expected type ({}) nor an error. Response={}", responseClass, rawResponse);
+ responseFuture.completeExceptionally(new RestClientException("Response was neither of the expected type(" + responseClass + ") nor an error."));
+ }
+ }
+ return responseFuture;
+ }
+
+ private static class ClientHandler extends SimpleChannelInboundHandler<Object> {
+
+ private final CompletableFuture<JsonNode> jsonFuture = new CompletableFuture<>();
+
+ CompletableFuture<JsonNode> getJsonFuture() {
+ return jsonFuture;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
+ if (msg instanceof FullHttpResponse) {
+ readRawResponse((FullHttpResponse) msg);
+ } else {
+ LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");
+ jsonFuture.completeExceptionally(new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse."));
+ }
+ ctx.close();
+ }
+
+ private void readRawResponse(FullHttpResponse msg) {
+ ByteBuf content = msg.content();
+
+ JsonNode rawResponse;
+ try {
+ InputStream in = new ByteBufInputStream(content);
+ rawResponse = objectMapper.readTree(in);
+ LOG.debug("Received response {}.", rawResponse);
+ } catch (JsonParseException je) {
+ LOG.error("Response was not valid JSON.", je);
+ jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON.", je));
+ return;
+ } catch (IOException ioe) {
+ LOG.error("Response could not be read.", ioe);
+ jsonFuture.completeExceptionally(new RestClientException("Response could not be read.", ioe));
+ return;
+ }
+ jsonFuture.complete(rawResponse);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java
new file mode 100644
index 0000000..420335c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+/**
+ * A configuration object for {@link RestClientEndpoint}s.
+ */
+public final class RestClientEndpointConfiguration {
+
+ private final String targetRestEndpointAddress;
+ private final int targetRestEndpointPort;
+ @Nullable
+ private final SSLEngine sslEngine;
+
+ private RestClientEndpointConfiguration(String targetRestEndpointAddress, int targetRestEndpointPort, @Nullable SSLEngine sslEngine) {
+ this.targetRestEndpointAddress = Preconditions.checkNotNull(targetRestEndpointAddress);
+ this.targetRestEndpointPort = targetRestEndpointPort;
+ this.sslEngine = sslEngine;
+ }
+
+ /**
+ * Returns the address of the REST server endpoint to connect to.
+ *
+ * @return REST server endpoint address
+ */
+ public String getTargetRestEndpointAddress() {
+ return targetRestEndpointAddress;
+ }
+
+ /**
+ * Returns the por tof the REST server endpoint to connect to.
+ *
+ * @return REST server endpoint port
+ */
+ public int getTargetRestEndpointPort() {
+ return targetRestEndpointPort;
+ }
+
+ /**
+ * Returns the {@link SSLEngine} that the REST client endpoint should use.
+ *
+ * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
+ */
+
+ public SSLEngine getSslEngine() {
+ return sslEngine;
+ }
+
+ /**
+ * Creates and returns a new {@link RestClientEndpointConfiguration} from the given {@link Configuration}.
+ *
+ * @param config configuration from which the REST client endpoint configuration should be created from
+ * @return REST client endpoint configuration
+ * @throws ConfigurationException if SSL was configured incorrectly
+ */
+
+ public static RestClientEndpointConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
+ Preconditions.checkNotNull(config);
+ String address = config.getString(RestOptions.REST_ADDRESS);
+ if (address == null) {
+ throw new ConfigurationException("The address of the REST server was not configured under " + RestOptions.REST_ADDRESS.key() + ".");
+ }
+
+ int port = config.getInteger(RestOptions.REST_PORT);
+ Preconditions.checkArgument(0 <= port && port <= 65536, "Port " + port + " is out of valid port range (0-65536).");
+
+ SSLEngine sslEngine = null;
+ boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
+ if (enableSSL) {
+ try {
+ SSLContext sslContext = SSLUtils.createSSLServerContext(config);
+ if (sslContext != null) {
+ sslEngine = sslContext.createSSLEngine();
+ SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
+ sslEngine.setUseClientMode(false);
+ }
+ } catch (Exception e) {
+ throw new ConfigurationException("Failed to initialize SSLContext for the web frontend", e);
+ }
+ }
+
+ return new RestClientEndpointConfiguration(address, port, sslEngine);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
new file mode 100644
index 0000000..6670267
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.handler.RouterHandler;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An abstract class for netty-based REST server endpoints.
+ */
+public abstract class RestServerEndpoint {
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final String configuredAddress;
+ private final int configuredPort;
+ private final SSLEngine sslEngine;
+ private final Router router = new Router();
+
+ private ServerBootstrap bootstrap;
+ private Channel serverChannel;
+
+ public RestServerEndpoint(RestServerEndpointConfiguration configuration) {
+ Preconditions.checkNotNull(configuration);
+ this.configuredAddress = configuration.getEndpointBindAddress();
+ this.configuredPort = configuration.getEndpointBindPort();
+ this.sslEngine = configuration.getSslEngine();
+ }
+
+ /**
+ * This method is called at the beginning of {@link #start()} to setup all handlers that the REST server endpoint
+ * implementation requires.
+ */
+ protected abstract Collection<AbstractRestHandler<?, ?, ?>> initializeHandlers();
+
+ /**
+ * Starts this REST server endpoint.
+ */
+ public void start() {
+ log.info("Starting rest endpoint.");
+ initializeHandlers()
+ .forEach(this::registerHandler);
+
+ ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
+
+ @Override
+ protected void initChannel(SocketChannel ch) {
+ Handler handler = new RouterHandler(router);
+
+ // SSL should be the first handler in the pipeline
+ if (sslEngine != null) {
+ ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
+ }
+
+ ch.pipeline()
+ .addLast(new HttpServerCodec())
+ .addLast(new HttpObjectAggregator(1024 * 1024 * 10))
+ .addLast(handler.name(), handler)
+ .addLast(new PipelineErrorHandler(log));
+ }
+ };
+
+ NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
+ NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+
+ bootstrap = new ServerBootstrap();
+ bootstrap
+ .group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(initializer);
+
+ ChannelFuture ch;
+ if (configuredAddress == null) {
+ ch = bootstrap.bind(configuredPort);
+ } else {
+ ch = bootstrap.bind(configuredAddress, configuredPort);
+ }
+ serverChannel = ch.syncUninterruptibly().channel();
+
+ InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
+ String address = bindAddress.getAddress().getHostAddress();
+ int port = bindAddress.getPort();
+
+ log.info("Rest endpoint listening at {}" + ':' + "{}", address, port);
+ }
+
+ private <R extends RequestBody, P extends ResponseBody> void registerHandler(AbstractRestHandler<R, P, ?> handler) {
+ switch (handler.getMessageHeaders().getHttpMethod()) {
+ case GET:
+ router.GET(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
+ break;
+ case POST:
+ router.POST(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
+ break;
+ }
+ }
+
+ /**
+ * Returns the address on which this endpoint is accepting requests.
+ *
+ * @return address on which this endpoint is accepting requests
+ */
+ public InetSocketAddress getServerAddress() {
+ Channel server = this.serverChannel;
+ if (server != null) {
+ try {
+ return ((InetSocketAddress) server.localAddress());
+ } catch (Exception e) {
+ log.error("Cannot access local server address", e);
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Stops this REST server endpoint.
+ */
+ public void shutdown() {
+ log.info("Shutting down rest endpoint.");
+
+ CompletableFuture<?> channelFuture = new CompletableFuture<>();
+ if (this.serverChannel != null) {
+ this.serverChannel.close().addListener(ignored -> channelFuture.complete(null));
+ }
+ CompletableFuture<?> groupFuture = new CompletableFuture<>();
+ CompletableFuture<?> childGroupFuture = new CompletableFuture<>();
+
+ channelFuture.thenRun(() -> {
+ if (bootstrap != null) {
+ if (bootstrap.group() != null) {
+ bootstrap.group().shutdownGracefully(0, 5, TimeUnit.SECONDS)
+ .addListener(ignored -> groupFuture.complete(null));
+ }
+ if (bootstrap.childGroup() != null) {
+ bootstrap.childGroup().shutdownGracefully(0, 5, TimeUnit.SECONDS)
+ .addListener(ignored -> childGroupFuture.complete(null));
+ }
+ }
+ });
+
+ try {
+ CompletableFuture.allOf(groupFuture, childGroupFuture)
+ .get(10, TimeUnit.SECONDS);
+ log.info("Rest endpoint shutdown complete.");
+ } catch (Exception e) {
+ log.warn("Rest endpoint shutdown failed.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
new file mode 100644
index 0000000..f910f2c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+/**
+ * A configuration object for {@link RestServerEndpoint}s.
+ */
+public final class RestServerEndpointConfiguration {
+
+ @Nullable
+ private final String restBindAddress;
+ private final int restBindPort;
+ @Nullable
+ private final SSLEngine sslEngine;
+
+ private RestServerEndpointConfiguration(@Nullable String restBindAddress, int targetRestEndpointPort, @Nullable SSLEngine sslEngine) {
+ this.restBindAddress = restBindAddress;
+ this.restBindPort = targetRestEndpointPort;
+ this.sslEngine = sslEngine;
+ }
+
+ /**
+ * Returns the address that the REST server endpoint should bind itself to.
+ *
+ * @return address that the REST server endpoint should bind itself to
+ */
+ public String getEndpointBindAddress() {
+ return restBindAddress;
+ }
+
+ /**
+ * Returns the port that the REST server endpoint should listen on.
+ *
+ * @return port that the REST server endpoint should listen on
+ */
+ public int getEndpointBindPort() {
+ return restBindPort;
+ }
+
+ /**
+ * Returns the {@link SSLEngine} that the REST server endpoint should use.
+ *
+ * @return SSLEngine that the REST server endpoint should use, or null if SSL was disabled
+ */
+ public SSLEngine getSslEngine() {
+ return sslEngine;
+ }
+
+ /**
+ * Creates and returns a new {@link RestServerEndpointConfiguration} from the given {@link Configuration}.
+ *
+ * @param config configuration from which the REST server endpoint configuration should be created from
+ * @return REST server endpoint configuration
+ * @throws ConfigurationException if SSL was configured incorrectly
+ */
+ public static RestServerEndpointConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
+ Preconditions.checkNotNull(config);
+ String address = config.getString(RestOptions.REST_ADDRESS);
+
+ int port = config.getInteger(RestOptions.REST_PORT);
+ Preconditions.checkArgument(0 <= port && port <= 65536, "Port " + port + " is out of valid port range (0-65536).");
+
+ SSLEngine sslEngine = null;
+ boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
+ if (enableSSL) {
+ try {
+ SSLContext sslContext = SSLUtils.createSSLServerContext(config);
+ if (sslContext != null) {
+ sslEngine = sslContext.createSSLEngine();
+ SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
+ sslEngine.setUseClientMode(false);
+ }
+ } catch (Exception e) {
+ throw new ConfigurationException("Failed to initialize SSLContext for REST server endpoint.", e);
+ }
+ }
+
+ return new RestServerEndpointConfiguration(address, port, sslEngine);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
new file mode 100644
index 0000000..07fce62
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * Super class for netty-based handlers that work with {@link RequestBody}s and {@link ResponseBody}s.
+ *
+ * <p>Subclasses must be thread-safe.
+ *
+ * @param <R> type of incoming requests
+ * @param <P> type of outgoing responses
+ */
+@ChannelHandler.Sharable
+public abstract class AbstractRestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends SimpleChannelInboundHandler<Routed> {
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final ObjectMapper mapper = RestMapperUtils.getStrictObjectMapper();
+
+ private final MessageHeaders<R, P, M> messageHeaders;
+
+ protected AbstractRestHandler(MessageHeaders<R, P, M> messageHeaders) {
+ this.messageHeaders = messageHeaders;
+ }
+
+ public MessageHeaders<R, P, M> getMessageHeaders() {
+ return messageHeaders;
+ }
+
+ @Override
+ protected void channelRead0(final ChannelHandlerContext ctx, Routed routed) throws Exception {
+ log.debug("Received request.");
+ final HttpRequest httpRequest = routed.request();
+
+ try {
+ if (!(httpRequest instanceof FullHttpRequest)) {
+ // The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns
+ // FullHttpRequests.
+ log.error("Implementation error: Received a request that wasn't a FullHttpRequest.");
+ sendErrorResponse(new ErrorResponseBody("Bad request received."), HttpResponseStatus.BAD_REQUEST, ctx, httpRequest);
+ return;
+ }
+
+ ByteBuf msgContent = ((FullHttpRequest) httpRequest).content();
+
+ R request;
+ if (msgContent.capacity() == 0) {
+ try {
+ request = mapper.readValue("{}", messageHeaders.getRequestClass());
+ } catch (JsonParseException | JsonMappingException je) {
+ log.error("Implementation error: Get request bodies must have a no-argument constructor.", je);
+ sendErrorResponse(new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);
+ return;
+ }
+ } else {
+ try {
+ ByteBufInputStream in = new ByteBufInputStream(msgContent);
+ request = mapper.readValue(in, messageHeaders.getRequestClass());
+ } catch (JsonParseException | JsonMappingException je) {
+ log.error("Failed to read request.", je);
+ sendErrorResponse(new ErrorResponseBody(String.format("Request did not match expected format %s.", messageHeaders.getRequestClass().getSimpleName())), HttpResponseStatus.BAD_REQUEST, ctx, httpRequest);
+ return;
+ }
+ }
+
+ CompletableFuture<P> response;
+ try {
+ HandlerRequest<R, M> handlerRequest = new HandlerRequest<>(request, messageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), routed.queryParams());
+ response = handleRequest(handlerRequest);
+ } catch (RestHandlerException rhe) {
+ sendErrorResponse(new ErrorResponseBody(rhe.getErrorMessage()), rhe.getErrorCode(), ctx, httpRequest);
+ return;
+ } catch (Exception e) {
+ response = FutureUtils.completedExceptionally(e);
+ }
+
+ response.whenComplete((P resp, Throwable error) -> {
+ if (error != null) {
+ if (error instanceof RestHandlerException) {
+ RestHandlerException rhe = (RestHandlerException) error;
+ sendErrorResponse(new ErrorResponseBody(rhe.getErrorMessage()), rhe.getErrorCode(), ctx, httpRequest);
+ } else {
+ log.error("Implementation error: Unhandled exception.", error);
+ sendErrorResponse(new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);
+ }
+ } else {
+ sendResponse(messageHeaders.getResponseStatusCode(), resp, ctx, httpRequest);
+ }
+ });
+ } catch (Exception e) {
+ log.error("Request processing failed.", e);
+ sendErrorResponse(new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);
+ }
+ }
+
+ /**
+ * This method is called for every incoming request and returns a {@link CompletableFuture} containing a the response.
+ *
+ * <p>Implementations may decide whether to throw {@link RestHandlerException}s or fail the returned
+ * {@link CompletableFuture} with a {@link RestHandlerException}.
+ *
+ * <p>Failing the future with another exception type or throwing unchecked exceptions is regarded as an
+ * implementation error as it does not allow us to provide a meaningful HTTP status code. In this case a
+ * {@link HttpResponseStatus#INTERNAL_SERVER_ERROR} will be returned.
+ *
+ * @param request request that should be handled
+ * @return future containing a handler response
+ * @throws RestHandlerException if the handling failed
+ */
+ protected abstract CompletableFuture<P> handleRequest(@Nonnull HandlerRequest<R, M> request) throws RestHandlerException;
+
+ private static <P extends ResponseBody> void sendResponse(HttpResponseStatus statusCode, P response, ChannelHandlerContext ctx, HttpRequest httpRequest) {
+ StringWriter sw = new StringWriter();
+ try {
+ mapper.writeValue(sw, response);
+ } catch (IOException ioe) {
+ sendErrorResponse(new ErrorResponseBody("Internal server error. Could not map response to JSON."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);
+ return;
+ }
+ sendResponse(ctx, httpRequest, statusCode, sw.toString());
+ }
+
+ static void sendErrorResponse(ErrorResponseBody error, HttpResponseStatus statusCode, ChannelHandlerContext ctx, HttpRequest httpRequest) {
+
+ StringWriter sw = new StringWriter();
+ try {
+ mapper.writeValue(sw, error);
+ } catch (IOException e) {
+ // this should never happen
+ sendResponse(ctx, httpRequest, HttpResponseStatus.INTERNAL_SERVER_ERROR, "Internal server error. Could not map error response to JSON.");
+ }
+ sendResponse(ctx, httpRequest, statusCode, sw.toString());
+ }
+
+ private static void sendResponse(@Nonnull ChannelHandlerContext ctx, @Nonnull HttpRequest httpRequest, @Nonnull HttpResponseStatus statusCode, @Nonnull String message) {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, statusCode);
+
+ response.headers().set(CONTENT_TYPE, "application/json");
+
+ if (HttpHeaders.isKeepAlive(httpRequest)) {
+ response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+ }
+
+ byte[] buf = message.getBytes(ConfigConstants.DEFAULT_CHARSET);
+ ByteBuf b = Unpooled.copiedBuffer(buf);
+ HttpHeaders.setContentLength(response, buf.length);
+
+ // write the initial line and the header.
+ ctx.write(response);
+
+ ctx.write(b);
+
+ ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+
+ // close the connection, if no keep-alive is needed
+ if (!HttpHeaders.isKeepAlive(httpRequest)) {
+ lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
new file mode 100644
index 0000000..90cc3e7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+
+/**
+ * Simple container for the request to a handler, that contains the {@link RequestBody} and path/query parameters.
+ *
+ * @param <R> type of the contained request body
+ * @param <M> type of the contained message parameters
+ */
+public class HandlerRequest<R extends RequestBody, M extends MessageParameters> {
+
+ private final R requestBody;
+ private final Map<Class<? extends MessagePathParameter>, MessagePathParameter<?>> pathParameters = new HashMap<>();
+ private final Map<Class<? extends MessageQueryParameter>, MessageQueryParameter<?>> queryParameters = new HashMap<>();
+
+ public HandlerRequest(R requestBody, M messageParameters, Map<String, String> pathParameters, Map<String, List<String>> queryParameters) {
+ this.requestBody = Preconditions.checkNotNull(requestBody);
+ Preconditions.checkNotNull(messageParameters);
+ Preconditions.checkNotNull(queryParameters);
+ Preconditions.checkNotNull(pathParameters);
+
+ for (MessagePathParameter<?> pathParameter : messageParameters.getPathParameters()) {
+ String value = pathParameters.get(pathParameter.getKey());
+ if (value != null) {
+ pathParameter.resolveFromString(value);
+ this.pathParameters.put(pathParameter.getClass(), pathParameter);
+ }
+ }
+
+ for (MessageQueryParameter<?> queryParameter : messageParameters.getQueryParameters()) {
+ List<String> values = queryParameters.get(queryParameter.getKey());
+ if (values != null && values.size() > 0) {
+ StringJoiner joiner = new StringJoiner(",");
+ values.forEach(joiner::add);
+ queryParameter.resolveFromString(joiner.toString());
+ this.queryParameters.put(queryParameter.getClass(), queryParameter);
+ }
+
+ }
+ }
+
+ /**
+ * Returns the request body.
+ *
+ * @return request body
+ */
+ public R getRequestBody() {
+ return requestBody;
+ }
+
+ /**
+ * Returns the {@link MessagePathParameter} for the given class.
+ *
+ * @param parameterClass class of the parameter
+ * @param <X> the value type that the parameter contains
+ * @param <PP> type of the path parameter
+ * @return path parameter for the given class, or null if no parameter value exists for the given class
+ */
+ @SuppressWarnings("unchecked")
+ public <X, PP extends MessagePathParameter<X>> PP getPathParameter(Class<PP> parameterClass) {
+ return (PP) pathParameters.get(parameterClass);
+ }
+
+ /**
+ * Returns the {@link MessageQueryParameter} for the given class.
+ *
+ * @param parameterClass class of the parameter
+ * @param <X> the value type that the parameter contains
+ * @param <QP> type of the query parameter
+ * @return query parameter for the given class, or null if no parameter value exists for the given class
+ */
+ @SuppressWarnings("unchecked")
+ public <X, QP extends MessageQueryParameter<X>> QP getQueryParameter(Class<QP> parameterClass) {
+ return (QP) queryParameters.get(parameterClass);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
new file mode 100644
index 0000000..742931b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.slf4j.Logger;
+
+/**
+ * This is the last handler in the pipeline. It logs all error messages.
+ */
+@ChannelHandler.Sharable
+public class PipelineErrorHandler extends SimpleChannelInboundHandler<HttpRequest> {
+
+ /** The logger to which the handler writes the log statements. */
+ private final Logger logger;
+
+ public PipelineErrorHandler(Logger logger) {
+ this.logger = logger;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest message) {
+ // we can't deal with this message. No one in the pipeline handled it. Log it.
+ logger.debug("Unknown message received: {}", message);
+ AbstractRestHandler.sendErrorResponse(new ErrorResponseBody("Bad request received."), HttpResponseStatus.BAD_REQUEST, ctx, message);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ logger.debug("Unhandled exception: {}", cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
new file mode 100644
index 0000000..a235f7e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * An exception that is thrown if the failure of a REST operation was detected by a handler.
+ */
+public class RestHandlerException extends Exception {
+ private final String errorMessage;
+ private final HttpResponseStatus errorCode;
+
+ public RestHandlerException(String errorMessage, HttpResponseStatus errorCode) {
+ this.errorMessage = errorMessage;
+ this.errorCode = errorCode;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public HttpResponseStatus getErrorCode() {
+ return errorCode;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
new file mode 100644
index 0000000..72b779b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is an extension of {@link Handler} that replaces the standard error response to be identical with those
+ * sent by the {@link AbstractRestHandler}.
+ */
+public class RouterHandler extends Handler {
+ private static final Logger LOG = LoggerFactory.getLogger(RouterHandler.class);
+
+ public RouterHandler(Router router) {
+ super(router);
+ }
+
+ @Override
+ protected void respondNotFound(ChannelHandlerContext ctx, HttpRequest request) {
+ AbstractRestHandler.sendErrorResponse(new ErrorResponseBody("Not found."), HttpResponseStatus.NOT_FOUND, ctx, request);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java
new file mode 100644
index 0000000..0a7d69e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Generic response body for communicating errors on the server.
+ */
+public final class ErrorResponseBody implements ResponseBody {
+
+ static final String FIELD_NAME_ERRORS = "errors";
+
+ @JsonProperty(FIELD_NAME_ERRORS)
+ public final List<String> errors;
+
+ public ErrorResponseBody(String error) {
+ this(Collections.singletonList(error));
+ }
+
+ @JsonCreator
+ public ErrorResponseBody(
+ @JsonProperty(FIELD_NAME_ERRORS) List<String> errors) {
+
+ this.errors = errors;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
new file mode 100644
index 0000000..254c231
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * This class links {@link RequestBody}s to {@link ResponseBody}s types and contains meta-data required for their http headers.
+ *
+ * <p>Implementations must be state-less.
+ *
+ * @param <R> request message type
+ * @param <P> response message type
+ * @param <M> message parameters type
+ */
+public interface MessageHeaders<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> {
+
+ /**
+ * Returns the class of the request message.
+ *
+ * @return class of the request message
+ */
+ Class<R> getRequestClass();
+
+ /**
+ * Returns the {@link HttpMethodWrapper} to be used for the request.
+ *
+ * @return http method to be used for the request
+ */
+ HttpMethodWrapper getHttpMethod();
+
+ /**
+ * Returns the generalized endpoint url that this request should be sent to, for example {@code /job/:jobid}.
+ *
+ * @return endpoint url that this request should be sent to
+ */
+ String getTargetRestEndpointURL();
+
+ /**
+ * Returns the class of the response message.
+ *
+ * @return class of the response message
+ */
+ Class<P> getResponseClass();
+
+ /**
+ * Returns the http status code for the response.
+ *
+ * @return http status code of the response
+ */
+ HttpResponseStatus getResponseStatusCode();
+
+ /**
+ * Returns a new {@link MessageParameters} object.
+ *
+ * @return new message parameters object
+ */
+ M getUnresolvedMessageParameters();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
new file mode 100644
index 0000000..b422d87
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * This class represents a single path/query parameter that can be used for a request. Every parameter has an associated
+ * key, and a one-time settable value.
+ *
+ * <p>Parameters are either mandatory or optional, indicating whether the parameter must be resolved for the request.
+ *
+ * <p>All parameters support symmetric conversion from their actual type and string via {@link #convertFromString(String)}
+ * and {@link #convertToString(Object)}. The conversion from {@code X} to string is required on the client to assemble the
+ * URL, whereas the conversion from string to {@code X} is required on the client to provide properly typed parameters
+ * to the handlers.
+ *
+ * @see MessagePathParameter
+ * @see MessageQueryParameter
+ */
+public abstract class MessageParameter<X> {
+ private boolean resolved = false;
+
+ private final MessageParameterRequisiteness requisiteness;
+
+ private final String key;
+ private X value;
+
+ MessageParameter(String key, MessageParameterRequisiteness requisiteness) {
+ this.key = key;
+ this.requisiteness = requisiteness;
+ }
+
+ /**
+ * Returns whether this parameter has been resolved.
+ *
+ * @return true, if this parameter was resolved, false otherwise
+ */
+ public final boolean isResolved() {
+ return resolved;
+ }
+
+ /**
+ * Resolves this parameter for the given value.
+ *
+ * @param value value to resolve this parameter with
+ */
+ public final void resolve(X value) {
+ Preconditions.checkState(!resolved, "This parameter was already resolved.");
+ this.value = value;
+ this.resolved = true;
+ }
+
+ /**
+ * Resolves this parameter for the given string value representation.
+ *
+ * @param value string representation of value to resolve this parameter with
+ */
+ public final void resolveFromString(String value) {
+ resolve(convertFromString(value));
+ }
+
+ /**
+ * Converts the given string to a valid value of this parameter.
+ *
+ * @param value string representation of parameter value
+ * @return parameter value
+ */
+ protected abstract X convertFromString(String value);
+
+ /**
+ * Converts the given value to its string representation.
+ *
+ * @param value parameter value
+ * @return string representation of typed value
+ */
+ protected abstract String convertToString(X value);
+
+ /**
+ * Returns the key of this parameter, e.g. "jobid".
+ *
+ * @return key of this parameter
+ */
+ public final String getKey() {
+ return key;
+ }
+
+ /**
+ * Returs the resolved value of this parameter, or {@code null} if it isn't resolved yet.
+ *
+ * @return resolved value, or null if it wasn't resolved yet
+ */
+ public final X getValue() {
+ return value;
+ }
+
+ /**
+ * Returs the resolved value of this parameter as a string, or {@code null} if it isn't resolved yet.
+ *
+ * @return resolved value, or null if it wasn't resolved yet
+ */
+ final String getValueAsString() {
+ return value == null
+ ? null
+ : convertToString(value);
+ }
+
+ /**
+ * Returns whether this parameter must be resolved for the request.
+ *
+ * @return true if the parameter is mandatory, false otherwise
+ */
+ public final boolean isMandatory() {
+ return requisiteness == MessageParameterRequisiteness.MANDATORY;
+ }
+
+ /**
+ * Enum for indicating whether a parameter is mandatory or optional.
+ */
+ protected enum MessageParameterRequisiteness {
+ MANDATORY,
+ OPTIONAL
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
new file mode 100644
index 0000000..30ada54
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+
+/**
+ * This class defines the path/query {@link MessageParameter}s that can be used for a request.
+ */
+public abstract class MessageParameters {
+
+ /**
+ * Returns the collection of {@link MessagePathParameter} that the request supports. The collection should not be
+ * modifiable.
+ *
+ * @return collection of all supported message path parameters
+ */
+ public abstract Collection<MessagePathParameter> getPathParameters();
+
+ /**
+ * Returns the collection of {@link MessageQueryParameter} that the request supports. The collection should not be
+ * modifiable.
+ *
+ * @return collection of all supported message query parameters
+ */
+ public abstract Collection<MessageQueryParameter> getQueryParameters();
+
+ /**
+ * Returns whether all mandatory parameters have been resolved.
+ *
+ * @return true, if all mandatory parameters have been resolved, false otherwise
+ */
+ public final boolean isResolved() {
+ return getPathParameters().stream().allMatch(parameter -> parameter.isMandatory() && parameter.isResolved())
+ && getQueryParameters().stream().allMatch(parameter -> parameter.isMandatory() && parameter.isResolved());
+ }
+
+ /**
+ * Resolves the given URL (e.g "jobs/:jobid") using the given path/query parameters.
+ *
+ * <p>This method will fail with an {@link IllegalStateException} if any mandatory parameter was not resolved.
+ *
+ * <p>Unresolved optional parameters will be ignored.
+ *
+ * @param genericUrl URL to resolve
+ * @param parameters message parameters parameters
+ * @return resolved url, e.g "/jobs/1234?state=running"
+ * @throws IllegalStateException if any mandatory parameter was not resolved
+ */
+ public static String resolveUrl(String genericUrl, MessageParameters parameters) {
+ Preconditions.checkState(parameters.isResolved(), "Not all mandatory message parameters were resolved.");
+ StringBuilder path = new StringBuilder(genericUrl);
+ StringBuilder queryParameters = new StringBuilder();
+
+ for (MessageParameter pathParameter : parameters.getPathParameters()) {
+ if (pathParameter.isResolved()) {
+ int start = path.indexOf(":" + pathParameter.getKey());
+ path.replace(start, start + pathParameter.getKey().length() + 1, pathParameter.getValueAsString());
+ }
+ }
+ boolean isFirstQueryParameter = true;
+ for (MessageQueryParameter queryParameter : parameters.getQueryParameters()) {
+ if (parameters.isResolved()) {
+ if (isFirstQueryParameter) {
+ queryParameters.append("?");
+ isFirstQueryParameter = false;
+ } else {
+ queryParameters.append("&");
+ }
+ queryParameters.append(queryParameter.getKey());
+ queryParameters.append("=");
+ queryParameters.append(queryParameter.getValueAsString());
+ }
+ }
+ path.append(queryParameters);
+
+ return path.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessagePathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessagePathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessagePathParameter.java
new file mode 100644
index 0000000..2355323
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessagePathParameter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * This class represents path parameters of a request. For example, the URL "/jobs/:jobid" has a
+ * "jobid" path parameter that is later replaced with an actual value.
+ */
+public abstract class MessagePathParameter<X> extends MessageParameter<X> {
+ protected MessagePathParameter(String key, MessageParameterRequisiteness requisiteness) {
+ super(key, requisiteness);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
new file mode 100644
index 0000000..506a14b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class represents query parameters of a request. For example, the URL "/jobs?state=running" has a
+ * "state" query parameter, with "running" being its value string representation.
+ *
+ * <p>Query parameters may both occur multiple times or be of the form "key=value1,value2,value3". If a query parameter
+ * is specified multiple times the individual values are concatenated with {@code ,} and passed as a single value to
+ * {@link #convertToString(List)}.
+ */
+public abstract class MessageQueryParameter<X> extends MessageParameter<List<X>> {
+ protected MessageQueryParameter(String key, MessageParameterRequisiteness requisiteness) {
+ super(key, requisiteness);
+ }
+
+ @Override
+ public List<X> convertFromString(String values) {
+ String[] splitValues = values.split(",");
+ List<X> list = new ArrayList<>();
+ for (String value : splitValues) {
+ list.add(convertValueFromString(value));
+ }
+ return list;
+ }
+
+ /**
+ * Converts the given string to a valid value of this parameter.
+ *
+ * @param value string representation of parameter value
+ * @return parameter value
+ */
+ public abstract X convertValueFromString(String value);
+
+ @Override
+ public String convertToString(List<X> values) {
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (X value : values) {
+ if (first) {
+ sb.append(convertStringToValue(value));
+ first = false;
+ } else {
+ sb.append(",");
+ sb.append(convertStringToValue(value));
+ }
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Converts the given value to its string representation.
+ *
+ * @param value parameter value
+ * @return string representation of typed value
+ */
+ public abstract String convertStringToValue(X value);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RequestBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RequestBody.java
new file mode 100644
index 0000000..ca55b17
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RequestBody.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Marker interface for all requests of the REST API. This class represents the http body of a request.
+ *
+ * <p>Subclass instances are converted to JSON using jackson-databind. Subclasses must have a constructor that accepts
+ * all fields of the JSON request, that should be annotated with {@code @JsonCreator}.
+ *
+ * <p>All fields that should part of the JSON request must be accessible either by being public or having a getter.
+ *
+ * <p>When adding methods that are prefixed with {@code get} make sure to annotate them with {@code @JsonIgnore}.
+ */
+public interface RequestBody {
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ResponseBody.java
new file mode 100644
index 0000000..d4e94d1d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ResponseBody.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Marker interface for all responses of the REST API. This class represents the http body of a response.
+ *
+ * <p>Subclass instances are converted to JSON using jackson-databind. Subclasses must have a constructor that accepts
+ * all fields of the JSON response, that should be annotated with {@code @JsonCreator}.
+ *
+ * <p>All fields that should part of the JSON response must be accessible either by being public or having a getter.
+ *
+ * <p>When adding methods that are prefixed with {@code get} make sure to annotate them with {@code @JsonIgnore}.
+ */
+public interface ResponseBody {
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
new file mode 100644
index 0000000..10328ac
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.util;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * An exception that is thrown if the failure of a REST operation was detected on the client.
+ */
+public class RestClientException extends FlinkException {
+ public RestClientException(String message) {
+ super(message);
+ }
+
+ public RestClientException(Throwable cause) {
+ super(cause);
+ }
+
+ public RestClientException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
new file mode 100644
index 0000000..647a708
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.util;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
+/**
+ * This class contains utilities for mapping requests and responses to/from JSON.
+ */
+public class RestMapperUtils {
+ private static final ObjectMapper objectMapper;
+
+ static {
+ objectMapper = new ObjectMapper();
+ objectMapper.enable(
+ DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
+ DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
+ DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY,
+ DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
+ objectMapper.disable(
+ SerializationFeature.FAIL_ON_EMPTY_BEANS);
+ }
+
+ /**
+ * Returns a preconfigured {@link ObjectMapper}.
+ *
+ * @return preconfigured object mapper
+ */
+ public static ObjectMapper getStrictObjectMapper() {
+ return objectMapper;
+ }
+}
[3/3] flink git commit: [FLINK-7040] [rest] Introduce executor,
shutdown timeouts and future completion in failure case to
RestServerEndpoint
Posted by tr...@apache.org.
[FLINK-7040] [rest] Introduce executor, shutdown timeouts and future completion in failure case to RestServerEndpoint
This commit also moves the target address and target port specification to the
RestClient#sendRequest call instead of passing the connection information to the
constructor of the RestClient.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bafddd79
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bafddd79
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bafddd79
Branch: refs/heads/master
Commit: bafddd7985271bea2557b57bab9ca1cc457124fa
Parents: c019787
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Aug 25 11:46:04 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Aug 26 11:11:00 2017 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/rest/RestClient.java | 238 +++++++++++++++++++
.../runtime/rest/RestClientConfiguration.java | 81 +++++++
.../flink/runtime/rest/RestClientEndpoint.java | 235 ------------------
.../rest/RestClientEndpointConfiguration.java | 111 ---------
.../flink/runtime/rest/RestServerEndpoint.java | 51 ++--
.../rest/RestServerEndpointConfiguration.java | 7 +-
.../rest/handler/AbstractRestHandler.java | 8 +-
.../runtime/rest/handler/HandlerRequest.java | 26 +-
.../rest/handler/PipelineErrorHandler.java | 4 +-
.../rest/handler/RestHandlerException.java | 2 +
.../runtime/rest/messages/MessageParameter.java | 12 +-
.../rest/messages/MessageParameters.java | 28 ++-
.../runtime/rest/util/RestClientException.java | 3 +
.../flink/runtime/rest/RestEndpointITCase.java | 37 ++-
.../rest/messages/MessageParametersTest.java | 7 +-
15 files changed, 430 insertions(+), 420 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
new file mode 100644
index 0000000..7422ece
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This client is the counter-part to the {@link RestServerEndpoint}.
+ */
+public class RestClient {
+ private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);
+
+ private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+
+ // used to open connections to a rest server endpoint
+ private final Executor executor;
+
+ private Bootstrap bootstrap;
+
+ public RestClient(RestClientConfiguration configuration, Executor executor) {
+ Preconditions.checkNotNull(configuration);
+ this.executor = Preconditions.checkNotNull(executor);
+
+ SSLEngine sslEngine = configuration.getSslEngine();
+ ChannelInitializer initializer = new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ // SSL should be the first handler in the pipeline
+ if (sslEngine != null) {
+ ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
+ }
+
+ ch.pipeline()
+ .addLast(new HttpClientCodec())
+ .addLast(new HttpObjectAggregator(1024 * 1024))
+ .addLast(new ClientHandler())
+ .addLast(new PipelineErrorHandler(LOG));
+ }
+ };
+ NioEventLoopGroup group = new NioEventLoopGroup(1);
+
+ bootstrap = new Bootstrap();
+ bootstrap
+ .group(group)
+ .channel(NioSocketChannel.class)
+ .handler(initializer);
+
+ LOG.info("Rest client endpoint started.");
+ }
+
+ public void shutdown(Time timeout) {
+ LOG.info("Shutting down rest endpoint.");
+ CompletableFuture<?> groupFuture = new CompletableFuture<>();
+ if (bootstrap != null) {
+ if (bootstrap.group() != null) {
+ bootstrap.group().shutdownGracefully(0, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+ .addListener(ignored -> groupFuture.complete(null));
+ }
+ }
+
+ try {
+ groupFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ LOG.info("Rest endpoint shutdown complete.");
+ } catch (Exception e) {
+ LOG.warn("Rest endpoint shutdown failed.", e);
+ }
+ }
+
+ public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request) throws IOException {
+ Preconditions.checkNotNull(targetAddress);
+ Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536].");
+ Preconditions.checkNotNull(messageHeaders);
+ Preconditions.checkNotNull(request);
+ Preconditions.checkNotNull(messageParameters);
+ Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved.");
+
+ String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
+
+ LOG.debug("Sending request of class {} to {}", request.getClass(), targetUrl);
+ // serialize payload
+ StringWriter sw = new StringWriter();
+ objectMapper.writeValue(sw, request);
+ ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+ // create request and set headers
+ FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
+ httpRequest.headers()
+ .add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity())
+ .add(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name())
+ .set(HttpHeaders.Names.HOST, targetAddress + ":" + targetPort)
+ .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+
+ return submitRequest(targetAddress, targetPort, httpRequest, messageHeaders.getResponseClass());
+ }
+
+ private <P extends ResponseBody> CompletableFuture<P> submitRequest(String targetAddress, int targetPort, FullHttpRequest httpRequest, Class<P> responseClass) {
+ return CompletableFuture.supplyAsync(() -> bootstrap.connect(targetAddress, targetPort), executor)
+ .thenApply((channel) -> {
+ try {
+ return channel.sync();
+ } catch (InterruptedException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ })
+ .thenApply((ChannelFuture::channel))
+ .thenCompose(channel -> {
+ ClientHandler handler = channel.pipeline().get(ClientHandler.class);
+ CompletableFuture<JsonNode> future = handler.getJsonFuture();
+ channel.writeAndFlush(httpRequest);
+ return future.thenComposeAsync(rawResponse -> parseResponse(rawResponse, responseClass), executor);
+ });
+ }
+
+ private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonNode rawResponse, Class<P> responseClass) {
+ CompletableFuture<P> responseFuture = new CompletableFuture<>();
+ try {
+ P response = objectMapper.treeToValue(rawResponse, responseClass);
+ responseFuture.complete(response);
+ } catch (JsonProcessingException jpe) {
+ // the received response did not matched the expected response type
+
+ // lets see if it is an ErrorResponse instead
+ try {
+ ErrorResponseBody error = objectMapper.treeToValue(rawResponse, ErrorResponseBody.class);
+ responseFuture.completeExceptionally(new RestClientException(error.errors.toString()));
+ } catch (JsonProcessingException jpe2) {
+ // if this fails it is either the expected type or response type was wrong, most likely caused
+ // by a client/search MessageHeaders mismatch
+ LOG.error("Received response was neither of the expected type ({}) nor an error. Response={}", responseClass, rawResponse, jpe2);
+ responseFuture.completeExceptionally(new RestClientException("Response was neither of the expected type(" + responseClass + ") nor an error.", jpe2));
+ }
+ }
+ return responseFuture;
+ }
+
+ private static class ClientHandler extends SimpleChannelInboundHandler<Object> {
+
+ private final CompletableFuture<JsonNode> jsonFuture = new CompletableFuture<>();
+
+ CompletableFuture<JsonNode> getJsonFuture() {
+ return jsonFuture;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
+ if (msg instanceof FullHttpResponse) {
+ readRawResponse((FullHttpResponse) msg);
+ } else {
+ LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");
+ jsonFuture.completeExceptionally(new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse."));
+ }
+ ctx.close();
+ }
+
+ private void readRawResponse(FullHttpResponse msg) {
+ ByteBuf content = msg.content();
+
+ JsonNode rawResponse;
+ try {
+ InputStream in = new ByteBufInputStream(content);
+ rawResponse = objectMapper.readTree(in);
+ LOG.debug("Received response {}.", rawResponse);
+ } catch (JsonParseException je) {
+ LOG.error("Response was not valid JSON.", je);
+ jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON.", je));
+ return;
+ } catch (IOException ioe) {
+ LOG.error("Response could not be read.", ioe);
+ jsonFuture.completeExceptionally(new RestClientException("Response could not be read.", ioe));
+ return;
+ }
+ jsonFuture.complete(rawResponse);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
new file mode 100644
index 0000000..7bf0307
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+/**
+ * A configuration object for {@link RestClient}s.
+ */
+public final class RestClientConfiguration {
+
+ @Nullable
+ private final SSLEngine sslEngine;
+
+ private RestClientConfiguration(@Nullable SSLEngine sslEngine) {
+ this.sslEngine = sslEngine;
+ }
+
+ /**
+ * Returns the {@link SSLEngine} that the REST client endpoint should use.
+ *
+ * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
+ */
+
+ public SSLEngine getSslEngine() {
+ return sslEngine;
+ }
+
+ /**
+ * Creates and returns a new {@link RestClientConfiguration} from the given {@link Configuration}.
+ *
+ * @param config configuration from which the REST client endpoint configuration should be created from
+ * @return REST client endpoint configuration
+ * @throws ConfigurationException if SSL was configured incorrectly
+ */
+
+ public static RestClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
+ Preconditions.checkNotNull(config);
+
+ SSLEngine sslEngine = null;
+ boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
+ if (enableSSL) {
+ try {
+ SSLContext sslContext = SSLUtils.createSSLServerContext(config);
+ if (sslContext != null) {
+ sslEngine = sslContext.createSSLEngine();
+ SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
+ sslEngine.setUseClientMode(false);
+ }
+ } catch (Exception e) {
+ throw new ConfigurationException("Failed to initialize SSLContext for the web frontend", e);
+ }
+ }
+
+ return new RestClientConfiguration(sslEngine);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
deleted file mode 100644
index 61e1d7b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
-import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
-import org.apache.flink.runtime.rest.messages.MessageParameters;
-import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.runtime.rest.messages.ResponseBody;
-import org.apache.flink.runtime.rest.util.RestClientException;
-import org.apache.flink.runtime.rest.util.RestMapperUtils;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLEngine;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * This client is the counter-part to the {@link RestServerEndpoint}.
- */
-public class RestClientEndpoint {
- private static final Logger LOG = LoggerFactory.getLogger(RestClientEndpoint.class);
-
- private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
-
- private final String configuredTargetAddress;
- private final int configuredTargetPort;
-
- private Bootstrap bootstrap;
-
- public RestClientEndpoint(RestClientEndpointConfiguration configuration) {
- Preconditions.checkNotNull(configuration);
- this.configuredTargetAddress = configuration.getTargetRestEndpointAddress();
- this.configuredTargetPort = configuration.getTargetRestEndpointPort();
-
- SSLEngine sslEngine = configuration.getSslEngine();
- ChannelInitializer initializer = new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- // SSL should be the first handler in the pipeline
- if (sslEngine != null) {
- ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
- }
-
- ch.pipeline()
- .addLast(new HttpClientCodec())
- .addLast(new HttpObjectAggregator(1024 * 1024))
- .addLast(new ClientHandler())
- .addLast(new PipelineErrorHandler(LOG));
- }
- };
- NioEventLoopGroup group = new NioEventLoopGroup(1);
-
- bootstrap = new Bootstrap();
- bootstrap
- .group(group)
- .channel(NioSocketChannel.class)
- .handler(initializer);
-
- LOG.info("Rest client endpoint started.");
- }
-
- public void shutdown() {
- LOG.info("Shutting down rest endpoint.");
- CompletableFuture<?> groupFuture = new CompletableFuture<>();
- if (bootstrap != null) {
- if (bootstrap.group() != null) {
- bootstrap.group().shutdownGracefully(0, 5, TimeUnit.SECONDS)
- .addListener(ignored -> groupFuture.complete(null));
- }
- }
-
- try {
- groupFuture.get(5, TimeUnit.SECONDS);
- LOG.info("Rest endpoint shutdown complete.");
- } catch (Exception e) {
- LOG.warn("Rest endpoint shutdown failed.", e);
- }
- }
-
- public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters, R request) throws IOException {
- Preconditions.checkNotNull(messageHeaders);
- Preconditions.checkNotNull(request);
- Preconditions.checkNotNull(messageParameters);
- Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved.");
-
- String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
-
- LOG.debug("Sending request of class {} to {}", request.getClass(), targetUrl);
- // serialize payload
- StringWriter sw = new StringWriter();
- objectMapper.writeValue(sw, request);
- ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
-
- // create request and set headers
- FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
- httpRequest.headers()
- .add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity())
- .add(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name())
- .set(HttpHeaders.Names.HOST, configuredTargetAddress + ":" + configuredTargetPort)
- .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
-
- return submitRequest(httpRequest, messageHeaders.getResponseClass());
- }
-
- private <P extends ResponseBody> CompletableFuture<P> submitRequest(FullHttpRequest httpRequest, Class<P> responseClass) {
- return CompletableFuture.supplyAsync(() -> bootstrap.connect(configuredTargetAddress, configuredTargetPort))
- .thenApply((channel) -> {
- try {
- return channel.sync();
- } catch (InterruptedException e) {
- throw new FlinkRuntimeException(e);
- }
- })
- .thenApply((ChannelFuture::channel))
- .thenCompose(channel -> {
- ClientHandler handler = channel.pipeline().get(ClientHandler.class);
- CompletableFuture<JsonNode> future = handler.getJsonFuture();
- channel.writeAndFlush(httpRequest);
- return future.thenCompose(rawResponse -> parseResponse(rawResponse, responseClass));
- });
- }
-
- private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonNode rawResponse, Class<P> responseClass) {
- CompletableFuture<P> responseFuture = new CompletableFuture<>();
- try {
- P response = objectMapper.treeToValue(rawResponse, responseClass);
- responseFuture.complete(response);
- } catch (JsonProcessingException jpe) {
- // the received response did not matched the expected response type
-
- // lets see if it is an ErrorResponse instead
- try {
- ErrorResponseBody error = objectMapper.treeToValue(rawResponse, ErrorResponseBody.class);
- responseFuture.completeExceptionally(new RestClientException(error.errors.toString()));
- } catch (JsonProcessingException jpe2) {
- // if this fails it is either the expected type or response type was wrong, most likely caused
- // by a client/search MessageHeaders mismatch
- LOG.error("Received response was neither of the expected type ({}) nor an error. Response={}", responseClass, rawResponse);
- responseFuture.completeExceptionally(new RestClientException("Response was neither of the expected type(" + responseClass + ") nor an error."));
- }
- }
- return responseFuture;
- }
-
- private static class ClientHandler extends SimpleChannelInboundHandler<Object> {
-
- private final CompletableFuture<JsonNode> jsonFuture = new CompletableFuture<>();
-
- CompletableFuture<JsonNode> getJsonFuture() {
- return jsonFuture;
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
- if (msg instanceof FullHttpResponse) {
- readRawResponse((FullHttpResponse) msg);
- } else {
- LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");
- jsonFuture.completeExceptionally(new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse."));
- }
- ctx.close();
- }
-
- private void readRawResponse(FullHttpResponse msg) {
- ByteBuf content = msg.content();
-
- JsonNode rawResponse;
- try {
- InputStream in = new ByteBufInputStream(content);
- rawResponse = objectMapper.readTree(in);
- LOG.debug("Received response {}.", rawResponse);
- } catch (JsonParseException je) {
- LOG.error("Response was not valid JSON.", je);
- jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON.", je));
- return;
- } catch (IOException ioe) {
- LOG.error("Response could not be read.", ioe);
- jsonFuture.completeExceptionally(new RestClientException("Response could not be read.", ioe));
- return;
- }
- jsonFuture.complete(rawResponse);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java
deleted file mode 100644
index 420335c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.runtime.net.SSLUtils;
-import org.apache.flink.util.ConfigurationException;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
-/**
- * A configuration object for {@link RestClientEndpoint}s.
- */
-public final class RestClientEndpointConfiguration {
-
- private final String targetRestEndpointAddress;
- private final int targetRestEndpointPort;
- @Nullable
- private final SSLEngine sslEngine;
-
- private RestClientEndpointConfiguration(String targetRestEndpointAddress, int targetRestEndpointPort, @Nullable SSLEngine sslEngine) {
- this.targetRestEndpointAddress = Preconditions.checkNotNull(targetRestEndpointAddress);
- this.targetRestEndpointPort = targetRestEndpointPort;
- this.sslEngine = sslEngine;
- }
-
- /**
- * Returns the address of the REST server endpoint to connect to.
- *
- * @return REST server endpoint address
- */
- public String getTargetRestEndpointAddress() {
- return targetRestEndpointAddress;
- }
-
- /**
- * Returns the por tof the REST server endpoint to connect to.
- *
- * @return REST server endpoint port
- */
- public int getTargetRestEndpointPort() {
- return targetRestEndpointPort;
- }
-
- /**
- * Returns the {@link SSLEngine} that the REST client endpoint should use.
- *
- * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
- */
-
- public SSLEngine getSslEngine() {
- return sslEngine;
- }
-
- /**
- * Creates and returns a new {@link RestClientEndpointConfiguration} from the given {@link Configuration}.
- *
- * @param config configuration from which the REST client endpoint configuration should be created from
- * @return REST client endpoint configuration
- * @throws ConfigurationException if SSL was configured incorrectly
- */
-
- public static RestClientEndpointConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
- Preconditions.checkNotNull(config);
- String address = config.getString(RestOptions.REST_ADDRESS);
- if (address == null) {
- throw new ConfigurationException("The address of the REST server was not configured under " + RestOptions.REST_ADDRESS.key() + ".");
- }
-
- int port = config.getInteger(RestOptions.REST_PORT);
- Preconditions.checkArgument(0 <= port && port <= 65536, "Port " + port + " is out of valid port range (0-65536).");
-
- SSLEngine sslEngine = null;
- boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
- if (enableSSL) {
- try {
- SSLContext sslContext = SSLUtils.createSSLServerContext(config);
- if (sslContext != null) {
- sslEngine = sslContext.createSSLEngine();
- SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
- sslEngine.setUseClientMode(false);
- }
- } catch (Exception e) {
- throw new ConfigurationException("Failed to initialize SSLContext for the web frontend", e);
- }
- }
-
- return new RestClientEndpointConfiguration(address, port, sslEngine);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 6670267..4a3ba89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.rest;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
import org.apache.flink.runtime.rest.handler.RouterHandler;
@@ -57,7 +58,6 @@ public abstract class RestServerEndpoint {
private final String configuredAddress;
private final int configuredPort;
private final SSLEngine sslEngine;
- private final Router router = new Router();
private ServerBootstrap bootstrap;
private Channel serverChannel;
@@ -80,8 +80,10 @@ public abstract class RestServerEndpoint {
*/
public void start() {
log.info("Starting rest endpoint.");
- initializeHandlers()
- .forEach(this::registerHandler);
+
+ final Router router = new Router();
+
+ initializeHandlers().forEach(handler -> registerHandler(router, handler));
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@@ -111,13 +113,13 @@ public abstract class RestServerEndpoint {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
- ChannelFuture ch;
+ final ChannelFuture channel;
if (configuredAddress == null) {
- ch = bootstrap.bind(configuredPort);
+ channel = bootstrap.bind(configuredPort);
} else {
- ch = bootstrap.bind(configuredAddress, configuredPort);
+ channel = bootstrap.bind(configuredAddress, configuredPort);
}
- serverChannel = ch.syncUninterruptibly().channel();
+ serverChannel = channel.syncUninterruptibly().channel();
InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
String address = bindAddress.getAddress().getHostAddress();
@@ -126,17 +128,6 @@ public abstract class RestServerEndpoint {
log.info("Rest endpoint listening at {}" + ':' + "{}", address, port);
}
- private <R extends RequestBody, P extends ResponseBody> void registerHandler(AbstractRestHandler<R, P, ?> handler) {
- switch (handler.getMessageHeaders().getHttpMethod()) {
- case GET:
- router.GET(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
- break;
- case POST:
- router.POST(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
- break;
- }
- }
-
/**
* Returns the address on which this endpoint is accepting requests.
*
@@ -158,7 +149,7 @@ public abstract class RestServerEndpoint {
/**
* Stops this REST server endpoint.
*/
- public void shutdown() {
+ public void shutdown(Time timeout) {
log.info("Shutting down rest endpoint.");
CompletableFuture<?> channelFuture = new CompletableFuture<>();
@@ -171,22 +162,36 @@ public abstract class RestServerEndpoint {
channelFuture.thenRun(() -> {
if (bootstrap != null) {
if (bootstrap.group() != null) {
- bootstrap.group().shutdownGracefully(0, 5, TimeUnit.SECONDS)
+ bootstrap.group().shutdownGracefully(0, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
.addListener(ignored -> groupFuture.complete(null));
}
if (bootstrap.childGroup() != null) {
- bootstrap.childGroup().shutdownGracefully(0, 5, TimeUnit.SECONDS)
+ bootstrap.childGroup().shutdownGracefully(0, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
.addListener(ignored -> childGroupFuture.complete(null));
}
+ } else {
+ // complete the group futures since there is nothing to stop
+ groupFuture.complete(null);
+ childGroupFuture.complete(null);
}
});
try {
- CompletableFuture.allOf(groupFuture, childGroupFuture)
- .get(10, TimeUnit.SECONDS);
+ CompletableFuture.allOf(groupFuture, childGroupFuture).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
log.info("Rest endpoint shutdown complete.");
} catch (Exception e) {
log.warn("Rest endpoint shutdown failed.", e);
}
}
+
+ private static <R extends RequestBody, P extends ResponseBody> void registerHandler(Router router, AbstractRestHandler<R, P, ?> handler) {
+ switch (handler.getMessageHeaders().getHttpMethod()) {
+ case GET:
+ router.GET(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
+ break;
+ case POST:
+ router.POST(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
+ break;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
index f910f2c..f342a01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -40,9 +40,11 @@ public final class RestServerEndpointConfiguration {
@Nullable
private final SSLEngine sslEngine;
- private RestServerEndpointConfiguration(@Nullable String restBindAddress, int targetRestEndpointPort, @Nullable SSLEngine sslEngine) {
+ private RestServerEndpointConfiguration(@Nullable String restBindAddress, int restBindPort, @Nullable SSLEngine sslEngine) {
this.restBindAddress = restBindAddress;
- this.restBindPort = targetRestEndpointPort;
+
+ Preconditions.checkArgument(0 <= restBindPort && restBindPort < 65536, "The bing rest port " + restBindPort + " is out of range (0, 65536[");
+ this.restBindPort = restBindPort;
this.sslEngine = sslEngine;
}
@@ -85,7 +87,6 @@ public final class RestServerEndpointConfiguration {
String address = config.getString(RestOptions.REST_ADDRESS);
int port = config.getInteger(RestOptions.REST_PORT);
- Preconditions.checkArgument(0 <= port && port <= 65536, "Port " + port + " is out of valid port range (0-65536).");
SSLEngine sslEngine = null;
boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 07fce62..23e2918 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -86,7 +86,10 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
@Override
protected void channelRead0(final ChannelHandlerContext ctx, Routed routed) throws Exception {
- log.debug("Received request.");
+ if (log.isDebugEnabled()) {
+ log.debug("Received request " + routed.request().getUri() + '.');
+ }
+
final HttpRequest httpRequest = routed.request();
try {
@@ -124,9 +127,6 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
try {
HandlerRequest<R, M> handlerRequest = new HandlerRequest<>(request, messageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), routed.queryParams());
response = handleRequest(handlerRequest);
- } catch (RestHandlerException rhe) {
- sendErrorResponse(new ErrorResponseBody(rhe.getErrorMessage()), rhe.getErrorCode(), ctx, httpRequest);
- return;
} catch (Exception e) {
response = FutureUtils.completedExceptionally(e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
index 90cc3e7..fa17b24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
@@ -38,30 +38,36 @@ import java.util.StringJoiner;
public class HandlerRequest<R extends RequestBody, M extends MessageParameters> {
private final R requestBody;
- private final Map<Class<? extends MessagePathParameter>, MessagePathParameter<?>> pathParameters = new HashMap<>();
- private final Map<Class<? extends MessageQueryParameter>, MessageQueryParameter<?>> queryParameters = new HashMap<>();
+ private final Map<Class<? extends MessagePathParameter<?>>, MessagePathParameter<?>> pathParameters = new HashMap<>(2);
+ private final Map<Class<? extends MessageQueryParameter<?>>, MessageQueryParameter<?>> queryParameters = new HashMap<>(2);
- public HandlerRequest(R requestBody, M messageParameters, Map<String, String> pathParameters, Map<String, List<String>> queryParameters) {
+ public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters) {
this.requestBody = Preconditions.checkNotNull(requestBody);
Preconditions.checkNotNull(messageParameters);
- Preconditions.checkNotNull(queryParameters);
- Preconditions.checkNotNull(pathParameters);
+ Preconditions.checkNotNull(receivedQueryParameters);
+ Preconditions.checkNotNull(receivedPathParameters);
for (MessagePathParameter<?> pathParameter : messageParameters.getPathParameters()) {
- String value = pathParameters.get(pathParameter.getKey());
+ String value = receivedPathParameters.get(pathParameter.getKey());
if (value != null) {
pathParameter.resolveFromString(value);
- this.pathParameters.put(pathParameter.getClass(), pathParameter);
+
+ @SuppressWarnings("unchecked")
+ Class<? extends MessagePathParameter<?>> clazz = (Class<? extends MessagePathParameter<?>>) pathParameter.getClass();
+ pathParameters.put(clazz, pathParameter);
}
}
for (MessageQueryParameter<?> queryParameter : messageParameters.getQueryParameters()) {
- List<String> values = queryParameters.get(queryParameter.getKey());
- if (values != null && values.size() > 0) {
+ List<String> values = receivedQueryParameters.get(queryParameter.getKey());
+ if (values != null && !values.isEmpty()) {
StringJoiner joiner = new StringJoiner(",");
values.forEach(joiner::add);
queryParameter.resolveFromString(joiner.toString());
- this.queryParameters.put(queryParameter.getClass(), queryParameter);
+
+ @SuppressWarnings("unchecked")
+ Class<? extends MessageQueryParameter<?>> clazz = (Class<? extends MessageQueryParameter<?>>) queryParameter.getClass();
+ queryParameters.put(clazz, queryParameter);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
index 742931b..14e643c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
@@ -44,12 +44,12 @@ public class PipelineErrorHandler extends SimpleChannelInboundHandler<HttpReques
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest message) {
// we can't deal with this message. No one in the pipeline handled it. Log it.
- logger.debug("Unknown message received: {}", message);
+ logger.warn("Unknown message received: {}", message);
AbstractRestHandler.sendErrorResponse(new ErrorResponseBody("Bad request received."), HttpResponseStatus.BAD_REQUEST, ctx, message);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- logger.debug("Unhandled exception: {}", cause);
+ logger.warn("Unhandled exception", cause);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
index a235f7e..9285f25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
@@ -24,6 +24,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
* An exception that is thrown if the failure of a REST operation was detected by a handler.
*/
public class RestHandlerException extends Exception {
+ private static final long serialVersionUID = -1358206297964070876L;
+
private final String errorMessage;
private final HttpResponseStatus errorCode;
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
index b422d87..e681e38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
@@ -28,7 +28,7 @@ import org.apache.flink.util.Preconditions;
*
* <p>All parameters support symmetric conversion from their actual type and string via {@link #convertFromString(String)}
* and {@link #convertToString(Object)}. The conversion from {@code X} to string is required on the client to assemble the
- * URL, whereas the conversion from string to {@code X} is required on the client to provide properly typed parameters
+ * URL, whereas the conversion from string to {@code X} is required on the server to provide properly typed parameters
* to the handlers.
*
* @see MessagePathParameter
@@ -43,8 +43,8 @@ public abstract class MessageParameter<X> {
private X value;
MessageParameter(String key, MessageParameterRequisiteness requisiteness) {
- this.key = key;
- this.requisiteness = requisiteness;
+ this.key = Preconditions.checkNotNull(key);
+ this.requisiteness = Preconditions.checkNotNull(requisiteness);
}
/**
@@ -63,7 +63,7 @@ public abstract class MessageParameter<X> {
*/
public final void resolve(X value) {
Preconditions.checkState(!resolved, "This parameter was already resolved.");
- this.value = value;
+ this.value = Preconditions.checkNotNull(value);
this.resolved = true;
}
@@ -102,7 +102,7 @@ public abstract class MessageParameter<X> {
}
/**
- * Returs the resolved value of this parameter, or {@code null} if it isn't resolved yet.
+ * Returns the resolved value of this parameter, or {@code null} if it isn't resolved yet.
*
* @return resolved value, or null if it wasn't resolved yet
*/
@@ -111,7 +111,7 @@ public abstract class MessageParameter<X> {
}
/**
- * Returs the resolved value of this parameter as a string, or {@code null} if it isn't resolved yet.
+ * Returns the resolved value of this parameter as a string, or {@code null} if it isn't resolved yet.
*
* @return resolved value, or null if it wasn't resolved yet
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
index 30ada54..96243c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
@@ -33,7 +33,7 @@ public abstract class MessageParameters {
*
* @return collection of all supported message path parameters
*/
- public abstract Collection<MessagePathParameter> getPathParameters();
+ public abstract Collection<MessagePathParameter<?>> getPathParameters();
/**
* Returns the collection of {@link MessageQueryParameter} that the request supports. The collection should not be
@@ -41,7 +41,7 @@ public abstract class MessageParameters {
*
* @return collection of all supported message query parameters
*/
- public abstract Collection<MessageQueryParameter> getQueryParameters();
+ public abstract Collection<MessageQueryParameter<?>> getQueryParameters();
/**
* Returns whether all mandatory parameters have been resolved.
@@ -49,8 +49,8 @@ public abstract class MessageParameters {
* @return true, if all mandatory parameters have been resolved, false otherwise
*/
public final boolean isResolved() {
- return getPathParameters().stream().allMatch(parameter -> parameter.isMandatory() && parameter.isResolved())
- && getQueryParameters().stream().allMatch(parameter -> parameter.isMandatory() && parameter.isResolved());
+ return getPathParameters().stream().filter(MessageParameter::isMandatory).allMatch(MessageParameter::isResolved)
+ && getQueryParameters().stream().filter(MessageParameter::isMandatory).allMatch(MessageParameter::isResolved);
}
/**
@@ -70,23 +70,29 @@ public abstract class MessageParameters {
StringBuilder path = new StringBuilder(genericUrl);
StringBuilder queryParameters = new StringBuilder();
- for (MessageParameter pathParameter : parameters.getPathParameters()) {
+ for (MessageParameter<?> pathParameter : parameters.getPathParameters()) {
if (pathParameter.isResolved()) {
- int start = path.indexOf(":" + pathParameter.getKey());
- path.replace(start, start + pathParameter.getKey().length() + 1, pathParameter.getValueAsString());
+ int start = path.indexOf(':' + pathParameter.getKey());
+
+ final String pathValue = Preconditions.checkNotNull(pathParameter.getValueAsString());
+
+ // only replace path parameters if they are present
+ if (start != -1) {
+ path.replace(start, start + pathParameter.getKey().length() + 1, pathValue);
+ }
}
}
boolean isFirstQueryParameter = true;
- for (MessageQueryParameter queryParameter : parameters.getQueryParameters()) {
+ for (MessageQueryParameter<?> queryParameter : parameters.getQueryParameters()) {
if (parameters.isResolved()) {
if (isFirstQueryParameter) {
- queryParameters.append("?");
+ queryParameters.append('?');
isFirstQueryParameter = false;
} else {
- queryParameters.append("&");
+ queryParameters.append('&');
}
queryParameters.append(queryParameter.getKey());
- queryParameters.append("=");
+ queryParameters.append('=');
queryParameters.append(queryParameter.getValueAsString());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
index 10328ac..9d86b47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
@@ -24,6 +24,9 @@ import org.apache.flink.util.FlinkException;
* An exception that is thrown if the failure of a REST operation was detected on the client.
*/
public class RestClientException extends FlinkException {
+
+ private static final long serialVersionUID = 937914622022344423L;
+
public RestClientException(String message) {
super(message);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
index e2ccfb5..c6469b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.rest;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -29,6 +30,7 @@ import org.apache.flink.runtime.rest.messages.MessagePathParameter;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.TestLogger;
@@ -48,23 +50,24 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
- * IT cases for {@link RestClientEndpoint} and {@link RestServerEndpoint}.
+ * IT cases for {@link RestClient} and {@link RestServerEndpoint}.
*/
public class RestEndpointITCase extends TestLogger {
private static final JobID PATH_JOB_ID = new JobID();
private static final JobID QUERY_JOB_ID = new JobID();
private static final String JOB_ID_KEY = "jobid";
+ private static final Time timeout = Time.seconds(10L);
@Test
public void testEndpoints() throws ConfigurationException, IOException, InterruptedException, ExecutionException {
Configuration config = new Configuration();
RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
- RestClientEndpointConfiguration clientConfig = RestClientEndpointConfiguration.fromConfiguration(config);
+ RestClientConfiguration clientConfig = RestClientConfiguration.fromConfiguration(config);
RestServerEndpoint serverEndpoint = new TestRestServerEndpoint(serverConfig);
- RestClientEndpoint clientEndpoint = new TestRestClientEndpoint(clientConfig);
+ RestClient clientEndpoint = new TestRestClient(clientConfig);
try {
serverEndpoint.start();
@@ -76,12 +79,22 @@ public class RestEndpointITCase extends TestLogger {
// send first request and wait until the handler blocks
CompletableFuture<TestResponse> response1;
synchronized (TestHandler.LOCK) {
- response1 = clientEndpoint.sendRequest(new TestHeaders(), parameters, new TestRequest(1));
+ response1 = clientEndpoint.sendRequest(
+ serverConfig.getEndpointBindAddress(),
+ serverConfig.getEndpointBindPort(),
+ new TestHeaders(),
+ parameters,
+ new TestRequest(1));
TestHandler.LOCK.wait();
}
// send second request and verify response
- CompletableFuture<TestResponse> response2 = clientEndpoint.sendRequest(new TestHeaders(), parameters, new TestRequest(2));
+ CompletableFuture<TestResponse> response2 = clientEndpoint.sendRequest(
+ serverConfig.getEndpointBindAddress(),
+ serverConfig.getEndpointBindPort(),
+ new TestHeaders(),
+ parameters,
+ new TestRequest(2));
Assert.assertEquals(2, response2.get().id);
// wake up blocked handler
@@ -91,8 +104,8 @@ public class RestEndpointITCase extends TestLogger {
// verify response to first request
Assert.assertEquals(1, response1.get().id);
} finally {
- clientEndpoint.shutdown();
- serverEndpoint.shutdown();
+ clientEndpoint.shutdown(timeout);
+ serverEndpoint.shutdown(timeout);
}
}
@@ -142,10 +155,10 @@ public class RestEndpointITCase extends TestLogger {
}
}
- private static class TestRestClientEndpoint extends RestClientEndpoint {
+ private static class TestRestClient extends RestClient {
- TestRestClientEndpoint(RestClientEndpointConfiguration configuration) {
- super(configuration);
+ TestRestClient(RestClientConfiguration configuration) {
+ super(configuration, TestingUtils.defaultExecutor());
}
}
@@ -205,12 +218,12 @@ public class RestEndpointITCase extends TestLogger {
private final JobIDQueryParameter jobIDQueryParameter = new JobIDQueryParameter();
@Override
- public Collection<MessagePathParameter> getPathParameters() {
+ public Collection<MessagePathParameter<?>> getPathParameters() {
return Collections.singleton(jobIDPathParameter);
}
@Override
- public Collection<MessageQueryParameter> getQueryParameters() {
+ public Collection<MessageQueryParameter<?>> getQueryParameters() {
return Collections.singleton(jobIDQueryParameter);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
index a5cfbf1..de9c80f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.rest.messages;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
@@ -29,7 +30,7 @@ import java.util.Collections;
/**
* Tests for {@link MessageParameters}.
*/
-public class MessageParametersTest {
+public class MessageParametersTest extends TestLogger {
@Test
public void testResolveUrl() {
String genericUrl = "/jobs/:jobid/state";
@@ -49,12 +50,12 @@ public class MessageParametersTest {
private final TestQueryParameter queryParameter = new TestQueryParameter();
@Override
- public Collection<MessagePathParameter> getPathParameters() {
+ public Collection<MessagePathParameter<?>> getPathParameters() {
return Collections.singleton(pathParameter);
}
@Override
- public Collection<MessageQueryParameter> getQueryParameters() {
+ public Collection<MessageQueryParameter<?>> getQueryParameters() {
return Collections.singleton(queryParameter);
}
}