You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/26 09:11:35 UTC
[3/3] flink git commit: [FLINK-7040] [rest] Introduce executor,
shutdown timeouts and future completion in failure case to
RestServerEndpoint
[FLINK-7040] [rest] Introduce executor, shutdown timeouts and future completion in failure case to RestServerEndpoint
This commit also moves the target address and target port specification to the
RestClient#sendRequest call instead of passing the connection information to the
constructor of the RestClient.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bafddd79
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bafddd79
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bafddd79
Branch: refs/heads/master
Commit: bafddd7985271bea2557b57bab9ca1cc457124fa
Parents: c019787
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Aug 25 11:46:04 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Aug 26 11:11:00 2017 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/rest/RestClient.java | 238 +++++++++++++++++++
.../runtime/rest/RestClientConfiguration.java | 81 +++++++
.../flink/runtime/rest/RestClientEndpoint.java | 235 ------------------
.../rest/RestClientEndpointConfiguration.java | 111 ---------
.../flink/runtime/rest/RestServerEndpoint.java | 51 ++--
.../rest/RestServerEndpointConfiguration.java | 7 +-
.../rest/handler/AbstractRestHandler.java | 8 +-
.../runtime/rest/handler/HandlerRequest.java | 26 +-
.../rest/handler/PipelineErrorHandler.java | 4 +-
.../rest/handler/RestHandlerException.java | 2 +
.../runtime/rest/messages/MessageParameter.java | 12 +-
.../rest/messages/MessageParameters.java | 28 ++-
.../runtime/rest/util/RestClientException.java | 3 +
.../flink/runtime/rest/RestEndpointITCase.java | 37 ++-
.../rest/messages/MessageParametersTest.java | 7 +-
15 files changed, 430 insertions(+), 420 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
new file mode 100644
index 0000000..7422ece
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This client is the counter-part to the {@link RestServerEndpoint}.
+ */
+public class RestClient {
+ private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);
+
+ private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+
+ // used to open connections to a rest server endpoint
+ private final Executor executor;
+
+ private Bootstrap bootstrap;
+
+ public RestClient(RestClientConfiguration configuration, Executor executor) {
+ Preconditions.checkNotNull(configuration);
+ this.executor = Preconditions.checkNotNull(executor);
+
+ SSLEngine sslEngine = configuration.getSslEngine();
+ ChannelInitializer initializer = new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ // SSL should be the first handler in the pipeline
+ if (sslEngine != null) {
+ ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
+ }
+
+ ch.pipeline()
+ .addLast(new HttpClientCodec())
+ .addLast(new HttpObjectAggregator(1024 * 1024))
+ .addLast(new ClientHandler())
+ .addLast(new PipelineErrorHandler(LOG));
+ }
+ };
+ NioEventLoopGroup group = new NioEventLoopGroup(1);
+
+ bootstrap = new Bootstrap();
+ bootstrap
+ .group(group)
+ .channel(NioSocketChannel.class)
+ .handler(initializer);
+
+ LOG.info("Rest client endpoint started.");
+ }
+
+ public void shutdown(Time timeout) {
+ LOG.info("Shutting down rest endpoint.");
+ CompletableFuture<?> groupFuture = new CompletableFuture<>();
+ if (bootstrap != null) {
+ if (bootstrap.group() != null) {
+ bootstrap.group().shutdownGracefully(0, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+ .addListener(ignored -> groupFuture.complete(null));
+ }
+ }
+
+ try {
+ groupFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ LOG.info("Rest endpoint shutdown complete.");
+ } catch (Exception e) {
+ LOG.warn("Rest endpoint shutdown failed.", e);
+ }
+ }
+
+ public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request) throws IOException {
+ Preconditions.checkNotNull(targetAddress);
+ Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536].");
+ Preconditions.checkNotNull(messageHeaders);
+ Preconditions.checkNotNull(request);
+ Preconditions.checkNotNull(messageParameters);
+ Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved.");
+
+ String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
+
+ LOG.debug("Sending request of class {} to {}", request.getClass(), targetUrl);
+ // serialize payload
+ StringWriter sw = new StringWriter();
+ objectMapper.writeValue(sw, request);
+ ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+ // create request and set headers
+ FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
+ httpRequest.headers()
+ .add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity())
+ .add(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name())
+ .set(HttpHeaders.Names.HOST, targetAddress + ":" + targetPort)
+ .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+
+ return submitRequest(targetAddress, targetPort, httpRequest, messageHeaders.getResponseClass());
+ }
+
+ private <P extends ResponseBody> CompletableFuture<P> submitRequest(String targetAddress, int targetPort, FullHttpRequest httpRequest, Class<P> responseClass) {
+ return CompletableFuture.supplyAsync(() -> bootstrap.connect(targetAddress, targetPort), executor)
+ .thenApply((channel) -> {
+ try {
+ return channel.sync();
+ } catch (InterruptedException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ })
+ .thenApply((ChannelFuture::channel))
+ .thenCompose(channel -> {
+ ClientHandler handler = channel.pipeline().get(ClientHandler.class);
+ CompletableFuture<JsonNode> future = handler.getJsonFuture();
+ channel.writeAndFlush(httpRequest);
+ return future.thenComposeAsync(rawResponse -> parseResponse(rawResponse, responseClass), executor);
+ });
+ }
+
+ private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonNode rawResponse, Class<P> responseClass) {
+ CompletableFuture<P> responseFuture = new CompletableFuture<>();
+ try {
+ P response = objectMapper.treeToValue(rawResponse, responseClass);
+ responseFuture.complete(response);
+ } catch (JsonProcessingException jpe) {
+ // the received response did not matched the expected response type
+
+ // lets see if it is an ErrorResponse instead
+ try {
+ ErrorResponseBody error = objectMapper.treeToValue(rawResponse, ErrorResponseBody.class);
+ responseFuture.completeExceptionally(new RestClientException(error.errors.toString()));
+ } catch (JsonProcessingException jpe2) {
+ // if this fails it is either the expected type or response type was wrong, most likely caused
+ // by a client/search MessageHeaders mismatch
+ LOG.error("Received response was neither of the expected type ({}) nor an error. Response={}", responseClass, rawResponse, jpe2);
+ responseFuture.completeExceptionally(new RestClientException("Response was neither of the expected type(" + responseClass + ") nor an error.", jpe2));
+ }
+ }
+ return responseFuture;
+ }
+
+ private static class ClientHandler extends SimpleChannelInboundHandler<Object> {
+
+ private final CompletableFuture<JsonNode> jsonFuture = new CompletableFuture<>();
+
+ CompletableFuture<JsonNode> getJsonFuture() {
+ return jsonFuture;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
+ if (msg instanceof FullHttpResponse) {
+ readRawResponse((FullHttpResponse) msg);
+ } else {
+ LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");
+ jsonFuture.completeExceptionally(new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse."));
+ }
+ ctx.close();
+ }
+
+ private void readRawResponse(FullHttpResponse msg) {
+ ByteBuf content = msg.content();
+
+ JsonNode rawResponse;
+ try {
+ InputStream in = new ByteBufInputStream(content);
+ rawResponse = objectMapper.readTree(in);
+ LOG.debug("Received response {}.", rawResponse);
+ } catch (JsonParseException je) {
+ LOG.error("Response was not valid JSON.", je);
+ jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON.", je));
+ return;
+ } catch (IOException ioe) {
+ LOG.error("Response could not be read.", ioe);
+ jsonFuture.completeExceptionally(new RestClientException("Response could not be read.", ioe));
+ return;
+ }
+ jsonFuture.complete(rawResponse);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
new file mode 100644
index 0000000..7bf0307
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+/**
+ * A configuration object for {@link RestClient}s.
+ */
+public final class RestClientConfiguration {
+
+ @Nullable
+ private final SSLEngine sslEngine;
+
+ private RestClientConfiguration(@Nullable SSLEngine sslEngine) {
+ this.sslEngine = sslEngine;
+ }
+
+ /**
+ * Returns the {@link SSLEngine} that the REST client endpoint should use.
+ *
+ * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
+ */
+
+ public SSLEngine getSslEngine() {
+ return sslEngine;
+ }
+
+ /**
+ * Creates and returns a new {@link RestClientConfiguration} from the given {@link Configuration}.
+ *
+ * @param config configuration from which the REST client endpoint configuration should be created from
+ * @return REST client endpoint configuration
+ * @throws ConfigurationException if SSL was configured incorrectly
+ */
+
+ public static RestClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
+ Preconditions.checkNotNull(config);
+
+ SSLEngine sslEngine = null;
+ boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
+ if (enableSSL) {
+ try {
+ SSLContext sslContext = SSLUtils.createSSLServerContext(config);
+ if (sslContext != null) {
+ sslEngine = sslContext.createSSLEngine();
+ SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
+ sslEngine.setUseClientMode(false);
+ }
+ } catch (Exception e) {
+ throw new ConfigurationException("Failed to initialize SSLContext for the web frontend", e);
+ }
+ }
+
+ return new RestClientConfiguration(sslEngine);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
deleted file mode 100644
index 61e1d7b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
-import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
-import org.apache.flink.runtime.rest.messages.MessageParameters;
-import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.runtime.rest.messages.ResponseBody;
-import org.apache.flink.runtime.rest.util.RestClientException;
-import org.apache.flink.runtime.rest.util.RestMapperUtils;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLEngine;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * This client is the counter-part to the {@link RestServerEndpoint}.
- */
-public class RestClientEndpoint {
- private static final Logger LOG = LoggerFactory.getLogger(RestClientEndpoint.class);
-
- private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
-
- private final String configuredTargetAddress;
- private final int configuredTargetPort;
-
- private Bootstrap bootstrap;
-
- public RestClientEndpoint(RestClientEndpointConfiguration configuration) {
- Preconditions.checkNotNull(configuration);
- this.configuredTargetAddress = configuration.getTargetRestEndpointAddress();
- this.configuredTargetPort = configuration.getTargetRestEndpointPort();
-
- SSLEngine sslEngine = configuration.getSslEngine();
- ChannelInitializer initializer = new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- // SSL should be the first handler in the pipeline
- if (sslEngine != null) {
- ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
- }
-
- ch.pipeline()
- .addLast(new HttpClientCodec())
- .addLast(new HttpObjectAggregator(1024 * 1024))
- .addLast(new ClientHandler())
- .addLast(new PipelineErrorHandler(LOG));
- }
- };
- NioEventLoopGroup group = new NioEventLoopGroup(1);
-
- bootstrap = new Bootstrap();
- bootstrap
- .group(group)
- .channel(NioSocketChannel.class)
- .handler(initializer);
-
- LOG.info("Rest client endpoint started.");
- }
-
- public void shutdown() {
- LOG.info("Shutting down rest endpoint.");
- CompletableFuture<?> groupFuture = new CompletableFuture<>();
- if (bootstrap != null) {
- if (bootstrap.group() != null) {
- bootstrap.group().shutdownGracefully(0, 5, TimeUnit.SECONDS)
- .addListener(ignored -> groupFuture.complete(null));
- }
- }
-
- try {
- groupFuture.get(5, TimeUnit.SECONDS);
- LOG.info("Rest endpoint shutdown complete.");
- } catch (Exception e) {
- LOG.warn("Rest endpoint shutdown failed.", e);
- }
- }
-
- public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters, R request) throws IOException {
- Preconditions.checkNotNull(messageHeaders);
- Preconditions.checkNotNull(request);
- Preconditions.checkNotNull(messageParameters);
- Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved.");
-
- String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
-
- LOG.debug("Sending request of class {} to {}", request.getClass(), targetUrl);
- // serialize payload
- StringWriter sw = new StringWriter();
- objectMapper.writeValue(sw, request);
- ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
-
- // create request and set headers
- FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
- httpRequest.headers()
- .add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity())
- .add(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name())
- .set(HttpHeaders.Names.HOST, configuredTargetAddress + ":" + configuredTargetPort)
- .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
-
- return submitRequest(httpRequest, messageHeaders.getResponseClass());
- }
-
- private <P extends ResponseBody> CompletableFuture<P> submitRequest(FullHttpRequest httpRequest, Class<P> responseClass) {
- return CompletableFuture.supplyAsync(() -> bootstrap.connect(configuredTargetAddress, configuredTargetPort))
- .thenApply((channel) -> {
- try {
- return channel.sync();
- } catch (InterruptedException e) {
- throw new FlinkRuntimeException(e);
- }
- })
- .thenApply((ChannelFuture::channel))
- .thenCompose(channel -> {
- ClientHandler handler = channel.pipeline().get(ClientHandler.class);
- CompletableFuture<JsonNode> future = handler.getJsonFuture();
- channel.writeAndFlush(httpRequest);
- return future.thenCompose(rawResponse -> parseResponse(rawResponse, responseClass));
- });
- }
-
- private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonNode rawResponse, Class<P> responseClass) {
- CompletableFuture<P> responseFuture = new CompletableFuture<>();
- try {
- P response = objectMapper.treeToValue(rawResponse, responseClass);
- responseFuture.complete(response);
- } catch (JsonProcessingException jpe) {
- // the received response did not matched the expected response type
-
- // lets see if it is an ErrorResponse instead
- try {
- ErrorResponseBody error = objectMapper.treeToValue(rawResponse, ErrorResponseBody.class);
- responseFuture.completeExceptionally(new RestClientException(error.errors.toString()));
- } catch (JsonProcessingException jpe2) {
- // if this fails it is either the expected type or response type was wrong, most likely caused
- // by a client/search MessageHeaders mismatch
- LOG.error("Received response was neither of the expected type ({}) nor an error. Response={}", responseClass, rawResponse);
- responseFuture.completeExceptionally(new RestClientException("Response was neither of the expected type(" + responseClass + ") nor an error."));
- }
- }
- return responseFuture;
- }
-
- private static class ClientHandler extends SimpleChannelInboundHandler<Object> {
-
- private final CompletableFuture<JsonNode> jsonFuture = new CompletableFuture<>();
-
- CompletableFuture<JsonNode> getJsonFuture() {
- return jsonFuture;
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
- if (msg instanceof FullHttpResponse) {
- readRawResponse((FullHttpResponse) msg);
- } else {
- LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");
- jsonFuture.completeExceptionally(new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse."));
- }
- ctx.close();
- }
-
- private void readRawResponse(FullHttpResponse msg) {
- ByteBuf content = msg.content();
-
- JsonNode rawResponse;
- try {
- InputStream in = new ByteBufInputStream(content);
- rawResponse = objectMapper.readTree(in);
- LOG.debug("Received response {}.", rawResponse);
- } catch (JsonParseException je) {
- LOG.error("Response was not valid JSON.", je);
- jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON.", je));
- return;
- } catch (IOException ioe) {
- LOG.error("Response could not be read.", ioe);
- jsonFuture.completeExceptionally(new RestClientException("Response could not be read.", ioe));
- return;
- }
- jsonFuture.complete(rawResponse);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java
deleted file mode 100644
index 420335c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.runtime.net.SSLUtils;
-import org.apache.flink.util.ConfigurationException;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
-/**
- * A configuration object for {@link RestClientEndpoint}s.
- */
-public final class RestClientEndpointConfiguration {
-
- private final String targetRestEndpointAddress;
- private final int targetRestEndpointPort;
- @Nullable
- private final SSLEngine sslEngine;
-
- private RestClientEndpointConfiguration(String targetRestEndpointAddress, int targetRestEndpointPort, @Nullable SSLEngine sslEngine) {
- this.targetRestEndpointAddress = Preconditions.checkNotNull(targetRestEndpointAddress);
- this.targetRestEndpointPort = targetRestEndpointPort;
- this.sslEngine = sslEngine;
- }
-
- /**
- * Returns the address of the REST server endpoint to connect to.
- *
- * @return REST server endpoint address
- */
- public String getTargetRestEndpointAddress() {
- return targetRestEndpointAddress;
- }
-
- /**
- * Returns the por tof the REST server endpoint to connect to.
- *
- * @return REST server endpoint port
- */
- public int getTargetRestEndpointPort() {
- return targetRestEndpointPort;
- }
-
- /**
- * Returns the {@link SSLEngine} that the REST client endpoint should use.
- *
- * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
- */
-
- public SSLEngine getSslEngine() {
- return sslEngine;
- }
-
- /**
- * Creates and returns a new {@link RestClientEndpointConfiguration} from the given {@link Configuration}.
- *
- * @param config configuration from which the REST client endpoint configuration should be created from
- * @return REST client endpoint configuration
- * @throws ConfigurationException if SSL was configured incorrectly
- */
-
- public static RestClientEndpointConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
- Preconditions.checkNotNull(config);
- String address = config.getString(RestOptions.REST_ADDRESS);
- if (address == null) {
- throw new ConfigurationException("The address of the REST server was not configured under " + RestOptions.REST_ADDRESS.key() + ".");
- }
-
- int port = config.getInteger(RestOptions.REST_PORT);
- Preconditions.checkArgument(0 <= port && port <= 65536, "Port " + port + " is out of valid port range (0-65536).");
-
- SSLEngine sslEngine = null;
- boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
- if (enableSSL) {
- try {
- SSLContext sslContext = SSLUtils.createSSLServerContext(config);
- if (sslContext != null) {
- sslEngine = sslContext.createSSLEngine();
- SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
- sslEngine.setUseClientMode(false);
- }
- } catch (Exception e) {
- throw new ConfigurationException("Failed to initialize SSLContext for the web frontend", e);
- }
- }
-
- return new RestClientEndpointConfiguration(address, port, sslEngine);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 6670267..4a3ba89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.rest;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
import org.apache.flink.runtime.rest.handler.RouterHandler;
@@ -57,7 +58,6 @@ public abstract class RestServerEndpoint {
private final String configuredAddress;
private final int configuredPort;
private final SSLEngine sslEngine;
- private final Router router = new Router();
private ServerBootstrap bootstrap;
private Channel serverChannel;
@@ -80,8 +80,10 @@ public abstract class RestServerEndpoint {
*/
public void start() {
log.info("Starting rest endpoint.");
- initializeHandlers()
- .forEach(this::registerHandler);
+
+ final Router router = new Router();
+
+ initializeHandlers().forEach(handler -> registerHandler(router, handler));
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@@ -111,13 +113,13 @@ public abstract class RestServerEndpoint {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
- ChannelFuture ch;
+ final ChannelFuture channel;
if (configuredAddress == null) {
- ch = bootstrap.bind(configuredPort);
+ channel = bootstrap.bind(configuredPort);
} else {
- ch = bootstrap.bind(configuredAddress, configuredPort);
+ channel = bootstrap.bind(configuredAddress, configuredPort);
}
- serverChannel = ch.syncUninterruptibly().channel();
+ serverChannel = channel.syncUninterruptibly().channel();
InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
String address = bindAddress.getAddress().getHostAddress();
@@ -126,17 +128,6 @@ public abstract class RestServerEndpoint {
log.info("Rest endpoint listening at {}" + ':' + "{}", address, port);
}
- private <R extends RequestBody, P extends ResponseBody> void registerHandler(AbstractRestHandler<R, P, ?> handler) {
- switch (handler.getMessageHeaders().getHttpMethod()) {
- case GET:
- router.GET(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
- break;
- case POST:
- router.POST(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
- break;
- }
- }
-
/**
* Returns the address on which this endpoint is accepting requests.
*
@@ -158,7 +149,7 @@ public abstract class RestServerEndpoint {
/**
* Stops this REST server endpoint.
*/
- public void shutdown() {
+ public void shutdown(Time timeout) {
log.info("Shutting down rest endpoint.");
CompletableFuture<?> channelFuture = new CompletableFuture<>();
@@ -171,22 +162,36 @@ public abstract class RestServerEndpoint {
channelFuture.thenRun(() -> {
if (bootstrap != null) {
if (bootstrap.group() != null) {
- bootstrap.group().shutdownGracefully(0, 5, TimeUnit.SECONDS)
+ bootstrap.group().shutdownGracefully(0, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
.addListener(ignored -> groupFuture.complete(null));
}
if (bootstrap.childGroup() != null) {
- bootstrap.childGroup().shutdownGracefully(0, 5, TimeUnit.SECONDS)
+ bootstrap.childGroup().shutdownGracefully(0, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
.addListener(ignored -> childGroupFuture.complete(null));
}
+ } else {
+ // complete the group futures since there is nothing to stop
+ groupFuture.complete(null);
+ childGroupFuture.complete(null);
}
});
try {
- CompletableFuture.allOf(groupFuture, childGroupFuture)
- .get(10, TimeUnit.SECONDS);
+ CompletableFuture.allOf(groupFuture, childGroupFuture).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
log.info("Rest endpoint shutdown complete.");
} catch (Exception e) {
log.warn("Rest endpoint shutdown failed.", e);
}
}
+
+ private static <R extends RequestBody, P extends ResponseBody> void registerHandler(Router router, AbstractRestHandler<R, P, ?> handler) {
+ switch (handler.getMessageHeaders().getHttpMethod()) {
+ case GET:
+ router.GET(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
+ break;
+ case POST:
+ router.POST(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
+ break;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
index f910f2c..f342a01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -40,9 +40,11 @@ public final class RestServerEndpointConfiguration {
@Nullable
private final SSLEngine sslEngine;
- private RestServerEndpointConfiguration(@Nullable String restBindAddress, int targetRestEndpointPort, @Nullable SSLEngine sslEngine) {
+ private RestServerEndpointConfiguration(@Nullable String restBindAddress, int restBindPort, @Nullable SSLEngine sslEngine) {
this.restBindAddress = restBindAddress;
- this.restBindPort = targetRestEndpointPort;
+
+ Preconditions.checkArgument(0 <= restBindPort && restBindPort < 65536, "The bing rest port " + restBindPort + " is out of range (0, 65536[");
+ this.restBindPort = restBindPort;
this.sslEngine = sslEngine;
}
@@ -85,7 +87,6 @@ public final class RestServerEndpointConfiguration {
String address = config.getString(RestOptions.REST_ADDRESS);
int port = config.getInteger(RestOptions.REST_PORT);
- Preconditions.checkArgument(0 <= port && port <= 65536, "Port " + port + " is out of valid port range (0-65536).");
SSLEngine sslEngine = null;
boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 07fce62..23e2918 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -86,7 +86,10 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
@Override
protected void channelRead0(final ChannelHandlerContext ctx, Routed routed) throws Exception {
- log.debug("Received request.");
+ if (log.isDebugEnabled()) {
+ log.debug("Received request " + routed.request().getUri() + '.');
+ }
+
final HttpRequest httpRequest = routed.request();
try {
@@ -124,9 +127,6 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
try {
HandlerRequest<R, M> handlerRequest = new HandlerRequest<>(request, messageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), routed.queryParams());
response = handleRequest(handlerRequest);
- } catch (RestHandlerException rhe) {
- sendErrorResponse(new ErrorResponseBody(rhe.getErrorMessage()), rhe.getErrorCode(), ctx, httpRequest);
- return;
} catch (Exception e) {
response = FutureUtils.completedExceptionally(e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
index 90cc3e7..fa17b24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
@@ -38,30 +38,36 @@ import java.util.StringJoiner;
public class HandlerRequest<R extends RequestBody, M extends MessageParameters> {
private final R requestBody;
- private final Map<Class<? extends MessagePathParameter>, MessagePathParameter<?>> pathParameters = new HashMap<>();
- private final Map<Class<? extends MessageQueryParameter>, MessageQueryParameter<?>> queryParameters = new HashMap<>();
+ private final Map<Class<? extends MessagePathParameter<?>>, MessagePathParameter<?>> pathParameters = new HashMap<>(2);
+ private final Map<Class<? extends MessageQueryParameter<?>>, MessageQueryParameter<?>> queryParameters = new HashMap<>(2);
- public HandlerRequest(R requestBody, M messageParameters, Map<String, String> pathParameters, Map<String, List<String>> queryParameters) {
+ public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters) {
this.requestBody = Preconditions.checkNotNull(requestBody);
Preconditions.checkNotNull(messageParameters);
- Preconditions.checkNotNull(queryParameters);
- Preconditions.checkNotNull(pathParameters);
+ Preconditions.checkNotNull(receivedQueryParameters);
+ Preconditions.checkNotNull(receivedPathParameters);
for (MessagePathParameter<?> pathParameter : messageParameters.getPathParameters()) {
- String value = pathParameters.get(pathParameter.getKey());
+ String value = receivedPathParameters.get(pathParameter.getKey());
if (value != null) {
pathParameter.resolveFromString(value);
- this.pathParameters.put(pathParameter.getClass(), pathParameter);
+
+ @SuppressWarnings("unchecked")
+ Class<? extends MessagePathParameter<?>> clazz = (Class<? extends MessagePathParameter<?>>) pathParameter.getClass();
+ pathParameters.put(clazz, pathParameter);
}
}
for (MessageQueryParameter<?> queryParameter : messageParameters.getQueryParameters()) {
- List<String> values = queryParameters.get(queryParameter.getKey());
- if (values != null && values.size() > 0) {
+ List<String> values = receivedQueryParameters.get(queryParameter.getKey());
+ if (values != null && !values.isEmpty()) {
StringJoiner joiner = new StringJoiner(",");
values.forEach(joiner::add);
queryParameter.resolveFromString(joiner.toString());
- this.queryParameters.put(queryParameter.getClass(), queryParameter);
+
+ @SuppressWarnings("unchecked")
+ Class<? extends MessageQueryParameter<?>> clazz = (Class<? extends MessageQueryParameter<?>>) queryParameter.getClass();
+ queryParameters.put(clazz, queryParameter);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
index 742931b..14e643c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
@@ -44,12 +44,12 @@ public class PipelineErrorHandler extends SimpleChannelInboundHandler<HttpReques
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest message) {
// we can't deal with this message. No one in the pipeline handled it. Log it.
- logger.debug("Unknown message received: {}", message);
+ logger.warn("Unknown message received: {}", message);
AbstractRestHandler.sendErrorResponse(new ErrorResponseBody("Bad request received."), HttpResponseStatus.BAD_REQUEST, ctx, message);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- logger.debug("Unhandled exception: {}", cause);
+ logger.warn("Unhandled exception", cause);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
index a235f7e..9285f25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
@@ -24,6 +24,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
* An exception that is thrown if the failure of a REST operation was detected by a handler.
*/
public class RestHandlerException extends Exception {
+ private static final long serialVersionUID = -1358206297964070876L;
+
private final String errorMessage;
private final HttpResponseStatus errorCode;
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
index b422d87..e681e38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
@@ -28,7 +28,7 @@ import org.apache.flink.util.Preconditions;
*
* <p>All parameters support symmetric conversion from their actual type and string via {@link #convertFromString(String)}
* and {@link #convertToString(Object)}. The conversion from {@code X} to string is required on the client to assemble the
- * URL, whereas the conversion from string to {@code X} is required on the client to provide properly typed parameters
+ * URL, whereas the conversion from string to {@code X} is required on the server to provide properly typed parameters
* to the handlers.
*
* @see MessagePathParameter
@@ -43,8 +43,8 @@ public abstract class MessageParameter<X> {
private X value;
MessageParameter(String key, MessageParameterRequisiteness requisiteness) {
- this.key = key;
- this.requisiteness = requisiteness;
+ this.key = Preconditions.checkNotNull(key);
+ this.requisiteness = Preconditions.checkNotNull(requisiteness);
}
/**
@@ -63,7 +63,7 @@ public abstract class MessageParameter<X> {
*/
public final void resolve(X value) {
Preconditions.checkState(!resolved, "This parameter was already resolved.");
- this.value = value;
+ this.value = Preconditions.checkNotNull(value);
this.resolved = true;
}
@@ -102,7 +102,7 @@ public abstract class MessageParameter<X> {
}
/**
- * Returs the resolved value of this parameter, or {@code null} if it isn't resolved yet.
+ * Returns the resolved value of this parameter, or {@code null} if it isn't resolved yet.
*
* @return resolved value, or null if it wasn't resolved yet
*/
@@ -111,7 +111,7 @@ public abstract class MessageParameter<X> {
}
/**
- * Returs the resolved value of this parameter as a string, or {@code null} if it isn't resolved yet.
+ * Returns the resolved value of this parameter as a string, or {@code null} if it isn't resolved yet.
*
* @return resolved value, or null if it wasn't resolved yet
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
index 30ada54..96243c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
@@ -33,7 +33,7 @@ public abstract class MessageParameters {
*
* @return collection of all supported message path parameters
*/
- public abstract Collection<MessagePathParameter> getPathParameters();
+ public abstract Collection<MessagePathParameter<?>> getPathParameters();
/**
* Returns the collection of {@link MessageQueryParameter} that the request supports. The collection should not be
@@ -41,7 +41,7 @@ public abstract class MessageParameters {
*
* @return collection of all supported message query parameters
*/
- public abstract Collection<MessageQueryParameter> getQueryParameters();
+ public abstract Collection<MessageQueryParameter<?>> getQueryParameters();
/**
* Returns whether all mandatory parameters have been resolved.
@@ -49,8 +49,8 @@ public abstract class MessageParameters {
* @return true, if all mandatory parameters have been resolved, false otherwise
*/
public final boolean isResolved() {
- return getPathParameters().stream().allMatch(parameter -> parameter.isMandatory() && parameter.isResolved())
- && getQueryParameters().stream().allMatch(parameter -> parameter.isMandatory() && parameter.isResolved());
+ return getPathParameters().stream().filter(MessageParameter::isMandatory).allMatch(MessageParameter::isResolved)
+ && getQueryParameters().stream().filter(MessageParameter::isMandatory).allMatch(MessageParameter::isResolved);
}
/**
@@ -70,23 +70,29 @@ public abstract class MessageParameters {
StringBuilder path = new StringBuilder(genericUrl);
StringBuilder queryParameters = new StringBuilder();
- for (MessageParameter pathParameter : parameters.getPathParameters()) {
+ for (MessageParameter<?> pathParameter : parameters.getPathParameters()) {
if (pathParameter.isResolved()) {
- int start = path.indexOf(":" + pathParameter.getKey());
- path.replace(start, start + pathParameter.getKey().length() + 1, pathParameter.getValueAsString());
+ int start = path.indexOf(':' + pathParameter.getKey());
+
+ final String pathValue = Preconditions.checkNotNull(pathParameter.getValueAsString());
+
+ // only replace path parameters if they are present
+ if (start != -1) {
+ path.replace(start, start + pathParameter.getKey().length() + 1, pathValue);
+ }
}
}
boolean isFirstQueryParameter = true;
- for (MessageQueryParameter queryParameter : parameters.getQueryParameters()) {
+ for (MessageQueryParameter<?> queryParameter : parameters.getQueryParameters()) {
if (parameters.isResolved()) {
if (isFirstQueryParameter) {
- queryParameters.append("?");
+ queryParameters.append('?');
isFirstQueryParameter = false;
} else {
- queryParameters.append("&");
+ queryParameters.append('&');
}
queryParameters.append(queryParameter.getKey());
- queryParameters.append("=");
+ queryParameters.append('=');
queryParameters.append(queryParameter.getValueAsString());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
index 10328ac..9d86b47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
@@ -24,6 +24,9 @@ import org.apache.flink.util.FlinkException;
* An exception that is thrown if the failure of a REST operation was detected on the client.
*/
public class RestClientException extends FlinkException {
+
+ private static final long serialVersionUID = 937914622022344423L;
+
public RestClientException(String message) {
super(message);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
index e2ccfb5..c6469b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.rest;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -29,6 +30,7 @@ import org.apache.flink.runtime.rest.messages.MessagePathParameter;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.TestLogger;
@@ -48,23 +50,24 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
- * IT cases for {@link RestClientEndpoint} and {@link RestServerEndpoint}.
+ * IT cases for {@link RestClient} and {@link RestServerEndpoint}.
*/
public class RestEndpointITCase extends TestLogger {
private static final JobID PATH_JOB_ID = new JobID();
private static final JobID QUERY_JOB_ID = new JobID();
private static final String JOB_ID_KEY = "jobid";
+ private static final Time timeout = Time.seconds(10L);
@Test
public void testEndpoints() throws ConfigurationException, IOException, InterruptedException, ExecutionException {
Configuration config = new Configuration();
RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
- RestClientEndpointConfiguration clientConfig = RestClientEndpointConfiguration.fromConfiguration(config);
+ RestClientConfiguration clientConfig = RestClientConfiguration.fromConfiguration(config);
RestServerEndpoint serverEndpoint = new TestRestServerEndpoint(serverConfig);
- RestClientEndpoint clientEndpoint = new TestRestClientEndpoint(clientConfig);
+ RestClient clientEndpoint = new TestRestClient(clientConfig);
try {
serverEndpoint.start();
@@ -76,12 +79,22 @@ public class RestEndpointITCase extends TestLogger {
// send first request and wait until the handler blocks
CompletableFuture<TestResponse> response1;
synchronized (TestHandler.LOCK) {
- response1 = clientEndpoint.sendRequest(new TestHeaders(), parameters, new TestRequest(1));
+ response1 = clientEndpoint.sendRequest(
+ serverConfig.getEndpointBindAddress(),
+ serverConfig.getEndpointBindPort(),
+ new TestHeaders(),
+ parameters,
+ new TestRequest(1));
TestHandler.LOCK.wait();
}
// send second request and verify response
- CompletableFuture<TestResponse> response2 = clientEndpoint.sendRequest(new TestHeaders(), parameters, new TestRequest(2));
+ CompletableFuture<TestResponse> response2 = clientEndpoint.sendRequest(
+ serverConfig.getEndpointBindAddress(),
+ serverConfig.getEndpointBindPort(),
+ new TestHeaders(),
+ parameters,
+ new TestRequest(2));
Assert.assertEquals(2, response2.get().id);
// wake up blocked handler
@@ -91,8 +104,8 @@ public class RestEndpointITCase extends TestLogger {
// verify response to first request
Assert.assertEquals(1, response1.get().id);
} finally {
- clientEndpoint.shutdown();
- serverEndpoint.shutdown();
+ clientEndpoint.shutdown(timeout);
+ serverEndpoint.shutdown(timeout);
}
}
@@ -142,10 +155,10 @@ public class RestEndpointITCase extends TestLogger {
}
}
- private static class TestRestClientEndpoint extends RestClientEndpoint {
+ private static class TestRestClient extends RestClient {
- TestRestClientEndpoint(RestClientEndpointConfiguration configuration) {
- super(configuration);
+ TestRestClient(RestClientConfiguration configuration) {
+ super(configuration, TestingUtils.defaultExecutor());
}
}
@@ -205,12 +218,12 @@ public class RestEndpointITCase extends TestLogger {
private final JobIDQueryParameter jobIDQueryParameter = new JobIDQueryParameter();
@Override
- public Collection<MessagePathParameter> getPathParameters() {
+ public Collection<MessagePathParameter<?>> getPathParameters() {
return Collections.singleton(jobIDPathParameter);
}
@Override
- public Collection<MessageQueryParameter> getQueryParameters() {
+ public Collection<MessageQueryParameter<?>> getQueryParameters() {
return Collections.singleton(jobIDQueryParameter);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
index a5cfbf1..de9c80f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.rest.messages;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
@@ -29,7 +30,7 @@ import java.util.Collections;
/**
* Tests for {@link MessageParameters}.
*/
-public class MessageParametersTest {
+public class MessageParametersTest extends TestLogger {
@Test
public void testResolveUrl() {
String genericUrl = "/jobs/:jobid/state";
@@ -49,12 +50,12 @@ public class MessageParametersTest {
private final TestQueryParameter queryParameter = new TestQueryParameter();
@Override
- public Collection<MessagePathParameter> getPathParameters() {
+ public Collection<MessagePathParameter<?>> getPathParameters() {
return Collections.singleton(pathParameter);
}
@Override
- public Collection<MessageQueryParameter> getQueryParameters() {
+ public Collection<MessageQueryParameter<?>> getQueryParameters() {
return Collections.singleton(queryParameter);
}
}