You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/18 20:59:40 UTC
flink git commit: [FLINK-7458] Generalize GatewayRetriever for
WebRuntimeMonitor
Repository: flink
Updated Branches:
refs/heads/master 1269f75ee -> 6ad0d3519
[FLINK-7458] Generalize GatewayRetriever for WebRuntimeMonitor
Introduce a generalized GatewayRetriever replacing the JobManagerRetriever. The
GatewayRetriever fulfills the same purpose as the JobManagerRetriever with the
ability to retrieve the gateway for an arbitrary endpoint type.
This closes #4549.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ad0d351
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ad0d351
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ad0d351
Branch: refs/heads/master
Commit: 6ad0d3519585a825499245493f4bc77dac77d439
Parents: 1269f75
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Aug 15 12:00:58 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Sep 18 15:15:30 2017 +0200
----------------------------------------------------------------------
.../webmonitor/RuntimeMonitorHandler.java | 27 +---
.../webmonitor/RuntimeMonitorHandlerBase.java | 50 +++++--
.../runtime/webmonitor/WebRuntimeMonitor.java | 13 +-
.../files/StaticFileServerHandler.java | 60 +++++---
.../handlers/HandlerRedirectUtils.java | 54 ++++++--
.../handlers/TaskManagerLogHandler.java | 4 +-
.../webmonitor/metrics/MetricFetcher.java | 8 +-
.../webmonitor/WebRuntimeMonitorITCase.java | 5 +-
.../handlers/HandlerRedirectUtilsTest.java | 15 +-
.../handlers/TaskManagerLogHandlerTest.java | 10 +-
.../metrics/AbstractMetricsHandlerTest.java | 8 +-
.../metrics/JobManagerMetricsHandlerTest.java | 6 +-
.../metrics/JobMetricsHandlerTest.java | 6 +-
.../metrics/JobVertexMetricsHandlerTest.java | 6 +-
.../webmonitor/metrics/MetricFetcherTest.java | 5 +-
.../metrics/TaskManagerMetricsHandlerTest.java | 6 +-
.../runtime/akka/AkkaJobManagerGateway.java | 6 +
.../clusterframework/BootstrapTools.java | 5 +-
.../runtime/jobmaster/JobManagerGateway.java | 4 +-
.../runtime/webmonitor/RestfulGateway.java | 42 ++++++
.../runtime/webmonitor/WebMonitorUtils.java | 7 +-
.../webmonitor/retriever/GatewayRetriever.java | 61 +++++++++
.../retriever/JobManagerRetriever.java | 123 -----------------
.../retriever/LeaderGatewayRetriever.java | 55 ++++++++
.../webmonitor/retriever/LeaderRetriever.java | 112 +++++++++++++++
.../retriever/impl/AkkaJobManagerRetriever.java | 35 ++---
.../retriever/impl/RpcGatewayRetriever.java | 50 +++++++
.../retriever/impl/RpcJobManagerRetriever.java | 46 -------
.../impl/AkkaJobManagerRetrieverTest.java | 105 ++++++++++++++
.../retriever/impl/RpcGatewayRetrieverTest.java | 136 +++++++++++++++++++
30 files changed, 771 insertions(+), 299 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 6305537..d3fc177 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -19,22 +19,18 @@
package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
@@ -43,12 +39,12 @@ import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
-import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import static org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils.ENCODING;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -62,8 +58,6 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
private static final Logger LOG = LoggerFactory.getLogger(RuntimeMonitorHandler.class);
- private static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
-
public static final String WEB_MONITOR_ADDRESS_KEY = "web.monitor.address";
private final RequestHandler handler;
@@ -73,7 +67,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
public RuntimeMonitorHandler(
WebMonitorConfig cfg,
RequestHandler handler,
- JobManagerRetriever retriever,
+ GatewayRetriever<JobManagerGateway> retriever,
CompletableFuture<String> localJobManagerAddressFuture,
Time timeout,
boolean httpsEnabled) {
@@ -124,18 +118,9 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
if (optNotFound.isPresent()) {
// this should result in a 404 error code (not found)
- Throwable e = optNotFound.get();
- ByteBuf message = e.getMessage() == null ? Unpooled.buffer(0)
- : Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
- finalResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message);
- finalResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
- finalResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, finalResponse.content().readableBytes());
+ finalResponse = HandlerRedirectUtils.getResponse(HttpResponseStatus.NOT_FOUND, optNotFound.get().getMessage());
} else {
- byte[] bytes = ExceptionUtils.stringifyException(throwable).getBytes(ENCODING);
- finalResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
- HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
- finalResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
- finalResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, finalResponse.content().readableBytes());
+ finalResponse = HandlerRedirectUtils.getErrorResponse(throwable);
}
} else {
finalResponse = httpResponse;
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
index 4cb55f1..4f4facf 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
@@ -31,6 +31,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -46,7 +49,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@ChannelHandler.Sharable
public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHandler<Routed> {
- private final JobManagerRetriever retriever;
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private final GatewayRetriever<JobManagerGateway> retriever;
protected final CompletableFuture<String> localJobManagerAddressFuture;
@@ -58,7 +63,7 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
protected String localJobManagerAddress;
public RuntimeMonitorHandlerBase(
- JobManagerRetriever retriever,
+ GatewayRetriever<JobManagerGateway> retriever,
CompletableFuture<String> localJobManagerAddressFuture,
Time timeout,
boolean httpsEnabled) {
@@ -83,19 +88,38 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
localJobManagerAddress = localJobManagerAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
}
- Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
+ Optional<JobManagerGateway> optJobManagerGateway = retriever.getNow();
if (optJobManagerGateway.isPresent()) {
JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
- String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
- localJobManagerAddress, jobManagerGateway, timeout);
-
- if (redirectAddress != null) {
- HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path(),
- httpsEnabled);
- KeepAliveWrite.flush(ctx, routed.request(), redirect);
- }
- else {
+ Optional<CompletableFuture<String>> optRedirectAddress = HandlerRedirectUtils.getRedirectAddress(
+ localJobManagerAddress,
+ jobManagerGateway,
+ timeout);
+
+ if (optRedirectAddress.isPresent()) {
+ optRedirectAddress.get().whenComplete(
+ (String redirectAddress, Throwable throwable) -> {
+ HttpResponse response;
+
+ if (throwable != null) {
+ logger.error("Could not retrieve the redirect address.", throwable);
+ response = HandlerRedirectUtils.getErrorResponse(throwable);
+ } else {
+ try {
+ response = HandlerRedirectUtils.getRedirectResponse(
+ redirectAddress,
+ routed.path(),
+ httpsEnabled);
+ } catch (Exception e) {
+ logger.error("Could not create the redirect response.", e);
+ response = HandlerRedirectUtils.getErrorResponse(e);
+ }
+ }
+
+ KeepAliveWrite.flush(ctx, routed.request(), response);
+ });
+ } else {
respondAsLeader(ctx, routed, jobManagerGateway);
}
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index e74541e..0cdab9c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
@@ -70,7 +71,7 @@ import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
import org.apache.flink.util.FileUtils;
@@ -117,7 +118,7 @@ public class WebRuntimeMonitor implements WebMonitor {
private final LeaderRetrievalService leaderRetrievalService;
/** Service which retrieves the currently leading JobManager and opens a JobManagerGateway. */
- private final JobManagerRetriever retriever;
+ private final LeaderGatewayRetriever<JobManagerGateway> retriever;
private final SSLContext serverSSLContext;
@@ -146,7 +147,7 @@ public class WebRuntimeMonitor implements WebMonitor {
Configuration config,
LeaderRetrievalService leaderRetrievalService,
BlobView blobView,
- JobManagerRetriever jobManagerRetriever,
+ LeaderGatewayRetriever<JobManagerGateway> jobManagerRetriever,
MetricQueryServiceRetriever queryServiceRetriever,
Time timeout,
Executor executor) throws IOException, InterruptedException {
@@ -292,7 +293,11 @@ public class WebRuntimeMonitor implements WebMonitor {
router
// log and stdout
.GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
- new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, logFiles.logFile,
+ new StaticFileServerHandler(
+ retriever,
+ jobManagerAddressFuture,
+ timeout,
+ logFiles.logFile,
enableSSL))
.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index 15acb00..1564064 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -27,9 +27,9 @@ package org.apache.flink.runtime.webmonitor.files;
*****************************************************************************/
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
@@ -97,7 +97,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* example.</p>
*/
@ChannelHandler.Sharable
-public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed> {
+public class StaticFileServerHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
/** Default logger, if none is specified. */
private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
@@ -113,7 +113,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
// ------------------------------------------------------------------------
- private final JobManagerRetriever retriever;
+ private final GatewayRetriever<T> retriever;
private final CompletableFuture<String> localJobManagerAddressFuture;
@@ -131,7 +131,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
private String localJobManagerAddress;
public StaticFileServerHandler(
- JobManagerRetriever retriever,
+ GatewayRetriever<T> retriever,
CompletableFuture<String> localJobManagerAddressPromise,
Time timeout,
File rootPath,
@@ -141,7 +141,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
}
public StaticFileServerHandler(
- JobManagerRetriever retriever,
+ GatewayRetriever<T> retriever,
CompletableFuture<String> localJobManagerAddressFuture,
Time timeout,
File rootPath,
@@ -168,31 +168,47 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
}
final HttpRequest request = routed.request();
- String requestPath = routed.path();
+ final String requestPath;
// make sure we request the "index.html" in case there is a directory request
- if (requestPath.endsWith("/")) {
- requestPath = requestPath + "index.html";
+ if (routed.path().endsWith("/")) {
+ requestPath = routed.path() + "index.html";
}
-
// in case the files being accessed are logs or stdout files, find appropriate paths.
- if (requestPath.equals("/jobmanager/log") || requestPath.equals("/jobmanager/stdout")) {
+ else if (routed.path().equals("/jobmanager/log") || routed.path().equals("/jobmanager/stdout")) {
requestPath = "";
+ } else {
+ requestPath = routed.path();
}
- Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
+ Optional<T> optLeader = retriever.getNow();
- if (optJobManagerGateway.isPresent()) {
+ if (optLeader.isPresent()) {
// Redirect to leader if necessary
- String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
- localJobManagerAddress, optJobManagerGateway.get(), timeout);
-
- if (redirectAddress != null) {
- HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(
- redirectAddress, requestPath, httpsEnabled);
- KeepAliveWrite.flush(ctx, routed.request(), redirect);
- }
- else {
+ Optional<CompletableFuture<String>> optRedirectAddress = HandlerRedirectUtils.getRedirectAddress(
+ localJobManagerAddress,
+ optLeader.get(),
+ timeout);
+
+ if (optRedirectAddress.isPresent()) {
+ optRedirectAddress.get().whenComplete(
+ (String address, Throwable throwable) -> {
+ if (throwable != null) {
+ logger.error("Failed to obtain redirect address.", throwable);
+ sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ } else {
+ try {
+ HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(
+ address, requestPath, httpsEnabled);
+
+ KeepAliveWrite.flush(ctx, routed.request(), redirect);
+ } catch (Exception e) {
+ logger.error("Failed to send redirect response.", e);
+ sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ }
+ }
+ });
+ } else {
respondAsLeader(ctx, request, requestPath);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
index e27d125..642d2f4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
@@ -21,18 +21,24 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.files.MimeTypes;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import javax.annotation.Nullable;
+
+import java.nio.charset.Charset;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -45,25 +51,24 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class HandlerRedirectUtils {
- public static String getRedirectAddress(
+ public static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
+
+ public static Optional<CompletableFuture<String>> getRedirectAddress(
String localJobManagerAddress,
- JobManagerGateway jobManagerGateway,
+ RestfulGateway restfulGateway,
Time timeout) throws Exception {
- final String leaderAddress = jobManagerGateway.getAddress();
+ final String leaderAddress = restfulGateway.getAddress();
final String jobManagerName = localJobManagerAddress.substring(localJobManagerAddress.lastIndexOf("/") + 1);
if (!localJobManagerAddress.equals(leaderAddress) &&
!leaderAddress.equals(AkkaUtils.getLocalAkkaURL(jobManagerName))) {
- // We are not the leader and need to redirect
- final String hostname = jobManagerGateway.getHostname();
- final CompletableFuture<Integer> webMonitorPortFuture = jobManagerGateway.requestWebPort(timeout);
- final int webMonitorPort = webMonitorPortFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- return String.format("%s:%d", hostname, webMonitorPort);
+ return Optional.of(restfulGateway.requestRestAddress(timeout));
+
} else {
- return null;
+ return Optional.empty();
}
}
@@ -94,4 +99,31 @@ public class HandlerRedirectUtils {
return unavailableResponse;
}
+
+ public static FullHttpResponse getResponse(HttpResponseStatus status, @Nullable String message) {
+ ByteBuf messageByteBuf = message == null ? Unpooled.buffer(0)
+ : Unpooled.wrappedBuffer(message.getBytes(ENCODING));
+ FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, messageByteBuf);
+ response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
+ response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+ return response;
+ }
+
+ public static FullHttpResponse getErrorResponse(Throwable throwable) {
+ return getErrorResponse(throwable, HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ }
+
+ public static FullHttpResponse getErrorResponse(Throwable throwable, HttpResponseStatus status) {
+ byte[] bytes = ExceptionUtils.stringifyException(throwable).getBytes(ENCODING);
+ FullHttpResponse response = new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1,
+ status,
+ Unpooled.wrappedBuffer(bytes));
+
+ response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
+ response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+ return response;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index f175573..b382b4c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -37,7 +37,7 @@ import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
@@ -120,7 +120,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
}
public TaskManagerLogHandler(
- JobManagerRetriever retriever,
+ GatewayRetriever<JobManagerGateway> retriever,
Executor executor,
CompletableFuture<String> localJobManagerAddressPromise,
Time timeout,
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
index 3fe4d12..a5f4ca5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.Preconditions;
@@ -53,7 +53,7 @@ import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.Metr
public class MetricFetcher {
private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class);
- private final JobManagerRetriever retriever;
+ private final GatewayRetriever<JobManagerGateway> retriever;
private final MetricQueryServiceRetriever queryServiceRetriever;
private final Executor executor;
private final Time timeout;
@@ -64,7 +64,7 @@ public class MetricFetcher {
private long lastUpdateTime;
public MetricFetcher(
- JobManagerRetriever retriever,
+ GatewayRetriever<JobManagerGateway> retriever,
MetricQueryServiceRetriever queryServiceRetriever,
Executor executor,
Time timeout) {
@@ -98,7 +98,7 @@ public class MetricFetcher {
private void fetchMetrics() {
try {
- Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
+ Optional<JobManagerGateway> optJobManagerGateway = retriever.getNow();
if (optJobManagerGateway.isPresent()) {
final JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 5829d1c..10b5ced 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.webmonitor.files.MimeTypes;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
@@ -509,11 +510,11 @@ public class WebRuntimeMonitorITCase extends TestLogger {
private void waitForLeaderNotification(
String expectedJobManagerURL,
- AkkaJobManagerRetriever retriever,
+ GatewayRetriever<JobManagerGateway> retriever,
Deadline deadline) throws Exception {
while (deadline.hasTimeLeft()) {
- Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
+ Optional<JobManagerGateway> optJobManagerGateway = retriever.getNow();
if (optJobManagerGateway.isPresent() && Objects.equals(expectedJobManagerURL, optJobManagerGateway.get().getAddress())) {
return;
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
index ac8d934..a8562b3 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
@@ -19,12 +19,14 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import static org.mockito.Matchers.any;
@@ -47,26 +49,29 @@ public class HandlerRedirectUtilsTest extends TestLogger {
JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
when(jobManagerGateway.getAddress()).thenReturn("akka://flink/user/foobar");
- String redirectingAddress = HandlerRedirectUtils.getRedirectAddress(
+ Optional<CompletableFuture<String>> redirectingAddress = HandlerRedirectUtils.getRedirectAddress(
localJobManagerAddress,
jobManagerGateway,
Time.seconds(3L));
- Assert.assertNull(redirectingAddress);
+ Assert.assertFalse(redirectingAddress.isPresent());
}
@Test
public void testGetRedirectAddressWithRemoteAkkaPath() throws Exception {
- JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
+ JobManagerGateway jobManagerGateway = mock(AkkaJobManagerGateway.class);
when(jobManagerGateway.getAddress()).thenReturn(remotePath);
when(jobManagerGateway.getHostname()).thenReturn(remoteHostname);
when(jobManagerGateway.requestWebPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(webPort));
+ when(jobManagerGateway.requestRestAddress(any(Time.class))).thenCallRealMethod();
- String redirectingAddress = HandlerRedirectUtils.getRedirectAddress(
+ Optional<CompletableFuture<String>> optRedirectingAddress = HandlerRedirectUtils.getRedirectAddress(
localJobManagerAddress,
jobManagerGateway,
Time.seconds(3L));
- Assert.assertEquals(remoteURL, redirectingAddress);
+ Assert.assertTrue(optRedirectingAddress.isPresent());
+
+ Assert.assertEquals(remoteURL, optRedirectingAddress.get().get());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
index cf59f05..c11fe6a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
@@ -63,7 +63,7 @@ public class TaskManagerLogHandlerTest {
@Test
public void testGetPaths() {
TaskManagerLogHandler handlerLog = new TaskManagerLogHandler(
- mock(JobManagerRetriever.class),
+ mock(GatewayRetriever.class),
Executors.directExecutor(),
CompletableFuture.completedFuture("/jm/address"),
TestingUtils.TIMEOUT(),
@@ -76,7 +76,7 @@ public class TaskManagerLogHandlerTest {
Assert.assertEquals("/taskmanagers/:taskmanagerid/log", pathsLog[0]);
TaskManagerLogHandler handlerOut = new TaskManagerLogHandler(
- mock(JobManagerRetriever.class),
+ mock(GatewayRetriever.class),
Executors.directExecutor(),
CompletableFuture.completedFuture("/jm/address"),
TestingUtils.TIMEOUT(),
@@ -113,8 +113,8 @@ public class TaskManagerLogHandlerTest {
when(jobManagerGateway.requestTaskManagerInstance(any(InstanceID.class), any(Time.class))).thenReturn(
CompletableFuture.completedFuture(Optional.of(taskManager)));
- JobManagerRetriever retriever = mock(JobManagerRetriever.class);
- when(retriever.getJobManagerGatewayNow())
+ GatewayRetriever<JobManagerGateway> retriever = mock(GatewayRetriever.class);
+ when(retriever.getNow())
.thenReturn(Optional.of(jobManagerGateway));
TaskManagerLogHandler handler = new TaskManagerLogHandler(
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
index 5296d33..0755888 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.metrics;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.TestLogger;
@@ -43,7 +43,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
@Test
public void testHandleRequest() throws Exception {
MetricFetcher fetcher = new MetricFetcher(
- mock(JobManagerRetriever.class),
+ mock(GatewayRetriever.class),
mock(MetricQueryServiceRetriever.class),
Executors.directExecutor(),
TestingUtils.TIMEOUT());
@@ -96,7 +96,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
@Test
public void testInvalidListDoesNotFail() {
MetricFetcher fetcher = new MetricFetcher(
- mock(JobManagerRetriever.class),
+ mock(GatewayRetriever.class),
mock(MetricQueryServiceRetriever.class),
Executors.directExecutor(),
TestingUtils.TIMEOUT());
@@ -126,7 +126,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
@Test
public void testInvalidGetDoesNotFail() {
MetricFetcher fetcher = new MetricFetcher(
- mock(JobManagerRetriever.class),
+ mock(GatewayRetriever.class),
mock(MetricQueryServiceRetriever.class),
Executors.directExecutor(),
TestingUtils.TIMEOUT());
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
index b02949a..6d17b40 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.metrics;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.TestLogger;
@@ -49,7 +49,7 @@ public class JobManagerMetricsHandlerTest extends TestLogger {
@Test
public void getMapFor() {
MetricFetcher fetcher = new MetricFetcher(
- mock(JobManagerRetriever.class),
+ mock(GatewayRetriever.class),
mock(MetricQueryServiceRetriever.class),
Executors.directExecutor(),
TestingUtils.TIMEOUT());
@@ -67,7 +67,7 @@ public class JobManagerMetricsHandlerTest extends TestLogger {
@Test
public void getMapForNull() {
MetricFetcher fetcher = new MetricFetcher(
- mock(JobManagerRetriever.class),
+ mock(GatewayRetriever.class),
mock(MetricQueryServiceRetriever.class),
Executors.directExecutor(),
TestingUtils.TIMEOUT());
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
index 569f772..b26ceab 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.metrics;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.TestLogger;
@@ -50,7 +50,7 @@ public class JobMetricsHandlerTest extends TestLogger {
@Test
public void getMapFor() throws Exception {
MetricFetcher fetcher = new MetricFetcher(
- mock(JobManagerRetriever.class),
+ mock(GatewayRetriever.class),
mock(MetricQueryServiceRetriever.class),
Executors.directExecutor(),
TestingUtils.TIMEOUT());
@@ -69,7 +69,7 @@ public class JobMetricsHandlerTest extends TestLogger {
@Test
public void getMapForNull() {
MetricFetcher fetcher = new MetricFetcher(
- mock(JobManagerRetriever.class),
+ mock(GatewayRetriever.class),
mock(MetricQueryServiceRetriever.class),
Executors.directExecutor(),
TestingUtils.TIMEOUT());
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
index e6bbd2e..d637a4a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.metrics;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.TestLogger;
@@ -51,7 +51,7 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
@Test
public void getMapFor() throws Exception {
MetricFetcher fetcher = new MetricFetcher(
- mock(JobManagerRetriever.class),
+ mock(GatewayRetriever.class),
mock(MetricQueryServiceRetriever.class),
Executors.directExecutor(),
TestingUtils.TIMEOUT());
@@ -73,7 +73,7 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
@Test
public void getMapForNull() {
MetricFetcher fetcher = new MetricFetcher(
- mock(JobManagerRetriever.class),
+ mock(GatewayRetriever.class),
mock(MetricQueryServiceRetriever.class),
Executors.directExecutor(),
TestingUtils.TIMEOUT());
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
index 4c91997..20c6373 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
@@ -96,8 +97,8 @@ public class MetricFetcherTest extends TestLogger {
when(jobManagerGateway.getAddress()).thenReturn("/jm/address");
when(jobManagerGateway.requestWebPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(0));
- AkkaJobManagerRetriever retriever = mock(AkkaJobManagerRetriever.class);
- when(retriever.getJobManagerGatewayNow())
+ GatewayRetriever<JobManagerGateway> retriever = mock(AkkaJobManagerRetriever.class);
+ when(retriever.getNow())
.thenReturn(Optional.of(jobManagerGateway));
// ========= setup QueryServices ================================================================================
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
index c4c1c7a..9c5549e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.metrics;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.TestLogger;
@@ -50,7 +50,7 @@ public class TaskManagerMetricsHandlerTest extends TestLogger {
@Test
public void getMapFor() throws Exception {
MetricFetcher fetcher = new MetricFetcher(
- mock(JobManagerRetriever.class),
+ mock(GatewayRetriever.class),
mock(MetricQueryServiceRetriever.class),
Executors.directExecutor(),
TestingUtils.TIMEOUT());
@@ -69,7 +69,7 @@ public class TaskManagerMetricsHandlerTest extends TestLogger {
@Test
public void getMapForNull() {
MetricFetcher fetcher = new MetricFetcher(
- mock(JobManagerRetriever.class),
+ mock(GatewayRetriever.class),
mock(MetricQueryServiceRetriever.class),
Executors.directExecutor(),
TestingUtils.TIMEOUT());
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index bbc5889..b9a7a8d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -267,4 +267,10 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
.ask(RequestJobsWithIDsOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
.mapTo(ClassTag$.MODULE$.apply(JobsWithIDsOverview.class)));
}
+
+ @Override
+ public CompletableFuture<String> requestRestAddress(Time timeout) {
+ return requestWebPort(timeout).thenApply(
+ (Integer webPort) -> getHostname() + ':' + webPort);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index d24a3d0..bd3e304 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -33,9 +33,10 @@ import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.NetUtils;
@@ -186,7 +187,7 @@ public class BootstrapTools {
public static WebMonitor startWebMonitorIfConfigured(
Configuration config,
HighAvailabilityServices highAvailabilityServices,
- JobManagerRetriever jobManagerRetriever,
+ LeaderGatewayRetriever<JobManagerGateway> jobManagerRetriever,
MetricQueryServiceRetriever queryServiceRetriever,
Time timeout,
Executor executor,
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
index a4d0d11..f2aaf17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
-import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
import javax.annotation.Nullable;
@@ -44,7 +44,7 @@ import java.util.concurrent.CompletableFuture;
* <p>This interface constitutes the operations an external component can
* trigger on the JobManager.
*/
-public interface JobManagerGateway extends RpcGateway {
+public interface JobManagerGateway extends RestfulGateway {
/**
* Requests the BlobServer port.
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
new file mode 100644
index 0000000..a5d52e5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Gateway for restful endpoints.
+ *
+ * <p>Gateways which implement this method run a REST endpoint which is reachable
+ * under the returned address.
+ */
+public interface RestfulGateway extends RpcGateway {
+
+ /**
+ * Requests the REST address of this {@link RpcEndpoint}.
+ *
+ * @param timeout for this operation
+ * @return Future REST endpoint address
+ */
+ CompletableFuture<String> requestRestAddress(Time timeout);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 9493696..57996bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -29,10 +29,11 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import com.fasterxml.jackson.databind.JsonNode;
@@ -130,7 +131,7 @@ public final class WebMonitorUtils {
public static WebMonitor startWebRuntimeMonitor(
Configuration config,
HighAvailabilityServices highAvailabilityServices,
- JobManagerRetriever jobManagerRetriever,
+ LeaderGatewayRetriever<JobManagerGateway> jobManagerRetriever,
MetricQueryServiceRetriever queryServiceRetriever,
Time timeout,
Executor executor) {
@@ -143,7 +144,7 @@ public final class WebMonitorUtils {
Configuration.class,
LeaderRetrievalService.class,
BlobView.class,
- JobManagerRetriever.class,
+ LeaderGatewayRetriever.class,
MetricQueryServiceRetriever.class,
Time.class,
Executor.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/GatewayRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/GatewayRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/GatewayRetriever.java
new file mode 100644
index 0000000..1771b72
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/GatewayRetriever.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import org.apache.flink.runtime.rpc.RpcGateway;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Generic retriever interface for {@link RpcGateway}.
+ *
+ * @param <T> type of the object to retrieve
+ */
+public interface GatewayRetriever<T extends RpcGateway> {
+
+ /**
+ * Get future of object to retrieve.
+ *
+ * @return Future object to retrieve
+ */
+ CompletableFuture<T> getFuture();
+
+ /**
+ * Returns the currently retrieved object if there is such an object. Otherwise
+ * it returns an empty optional.
+ *
+ * @return Optional object to retrieve
+ * @throws Exception if the future has been completed with an exception
+ */
+ default Optional<T> getNow() throws Exception {
+ CompletableFuture<T> leaderFuture = getFuture();
+ if (leaderFuture != null) {
+ CompletableFuture<T> currentLeaderFuture = leaderFuture;
+
+ if (currentLeaderFuture.isDone()) {
+ return Optional.of(currentLeaderFuture.get());
+ } else {
+ return Optional.empty();
+ }
+ } else {
+ return Optional.empty();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java
deleted file mode 100644
index 2eade48..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.retriever;
-
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.util.FlinkException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Optional;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Retrieves and stores the JobManagerGateway for the current leading JobManager.
- */
-public abstract class JobManagerRetriever implements LeaderRetrievalListener {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- // False if we have to create a new JobManagerGateway future when being notified
- // about a new leader address
- private final AtomicBoolean firstTimeUsage;
-
- private volatile CompletableFuture<JobManagerGateway> jobManagerGatewayFuture;
-
- public JobManagerRetriever() {
- firstTimeUsage = new AtomicBoolean(true);
- jobManagerGatewayFuture = new CompletableFuture<>();
- }
-
- /**
- * Returns the currently known leading job manager gateway and its web monitor port.
- */
- public Optional<JobManagerGateway> getJobManagerGatewayNow() throws Exception {
- if (jobManagerGatewayFuture != null) {
- CompletableFuture<JobManagerGateway> jobManagerGatewayFuture = this.jobManagerGatewayFuture;
-
- if (jobManagerGatewayFuture.isDone()) {
- return Optional.of(jobManagerGatewayFuture.get());
- } else {
- return Optional.empty();
- }
- } else {
- return Optional.empty();
- }
- }
-
- /**
- * Returns the current JobManagerGateway future.
- */
- public CompletableFuture<JobManagerGateway> getJobManagerGateway() throws Exception {
- return jobManagerGatewayFuture;
- }
-
- @Override
- public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
- if (leaderAddress != null && !leaderAddress.equals("")) {
- try {
- final CompletableFuture<JobManagerGateway> newJobManagerGatewayFuture;
-
- if (firstTimeUsage.compareAndSet(true, false)) {
- newJobManagerGatewayFuture = jobManagerGatewayFuture;
- } else {
- newJobManagerGatewayFuture = new CompletableFuture<>();
- jobManagerGatewayFuture = newJobManagerGatewayFuture;
- }
-
- log.info("New leader reachable under {}:{}.", leaderAddress, leaderSessionID);
-
- createJobManagerGateway(leaderAddress, leaderSessionID).whenComplete(
- (JobManagerGateway jobManagerGateway, Throwable throwable) -> {
- if (throwable != null) {
- newJobManagerGatewayFuture.completeExceptionally(new FlinkException("Could not retrieve" +
- "the current job manager gateway.", throwable));
- } else {
- newJobManagerGatewayFuture.complete(jobManagerGateway);
- }
- }
- );
- }
- catch (Exception e) {
- handleError(e);
- }
- }
- }
-
- @Override
- public void handleError(Exception exception) {
- log.error("Received error from LeaderRetrievalService.", exception);
-
- jobManagerGatewayFuture.completeExceptionally(exception);
- }
-
- /**
- * Create a JobManagerGateway for the given leader address and leader id.
- *
- * @param leaderAddress to connect against
- * @param leaderId the endpoint currently uses
- * @return Future containing the resolved JobManagerGateway
- * @throws Exception if the JobManagerGateway creation failed
- */
- protected abstract CompletableFuture<JobManagerGateway> createJobManagerGateway(String leaderAddress, UUID leaderId) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetriever.java
new file mode 100644
index 0000000..4e59859
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetriever.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.RpcGateway;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Retrieves and stores the leading {@link RpcGateway}.
+ *
+ * @param <T> type of the gateway to retrieve
+ */
+public abstract class LeaderGatewayRetriever<T extends RpcGateway> extends LeaderRetriever implements GatewayRetriever<T> {
+
+ private volatile CompletableFuture<T> gatewayFuture;
+
+ public LeaderGatewayRetriever() {
+ gatewayFuture = createGateway(getLeaderFuture());
+ }
+
+ @Override
+ public CompletableFuture<T> getFuture() {
+ return gatewayFuture;
+ }
+
+ @Override
+ public CompletableFuture<Tuple2<String, UUID>> createNewFuture() {
+ CompletableFuture<Tuple2<String, UUID>> newFuture = super.createNewFuture();
+
+ gatewayFuture = createGateway(newFuture);
+
+ return newFuture;
+ }
+
+ protected abstract CompletableFuture<T> createGateway(CompletableFuture<Tuple2<String, UUID>> leaderFuture);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderRetriever.java
new file mode 100644
index 0000000..fbfb507
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderRetriever.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Retrieves and stores the current leader address.
+ */
+public class LeaderRetriever implements LeaderRetrievalListener {
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ // False if we have to create a new JobManagerGateway future when being notified
+ // about a new leader address
+ private final AtomicBoolean firstTimeUsage;
+
+ protected volatile CompletableFuture<Tuple2<String, UUID>> leaderFuture;
+
+ public LeaderRetriever() {
+ firstTimeUsage = new AtomicBoolean(true);
+ leaderFuture = new CompletableFuture<>();
+ }
+
+ /**
+ * Returns the current leader information if available. Otherwise it returns an
+ * empty optional.
+ *
+ * @return The current leader information if available. Otherwise it returns an
+ * empty optional.
+ * @throws Exception if the leader future has been completed with an exception
+ */
+ public Optional<Tuple2<String, UUID>> getLeaderNow() throws Exception {
+ CompletableFuture<Tuple2<String, UUID>> leaderFuture = this.leaderFuture;
+ if (leaderFuture != null) {
+ CompletableFuture<Tuple2<String, UUID>> currentLeaderFuture = leaderFuture;
+
+ if (currentLeaderFuture.isDone()) {
+ return Optional.of(currentLeaderFuture.get());
+ } else {
+ return Optional.empty();
+ }
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Returns the current JobManagerGateway future.
+ */
+ public CompletableFuture<Tuple2<String, UUID>> getLeaderFuture() {
+ return leaderFuture;
+ }
+
+ @Override
+ public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
+ if (leaderAddress != null && !leaderAddress.equals("")) {
+ try {
+ final CompletableFuture<Tuple2<String, UUID>> newLeaderFuture;
+
+ if (firstTimeUsage.compareAndSet(true, false)) {
+ newLeaderFuture = leaderFuture;
+ } else {
+ newLeaderFuture = createNewFuture();
+ leaderFuture = newLeaderFuture;
+ }
+
+ log.info("New leader reachable under {}:{}.", leaderAddress, leaderSessionID);
+
+ newLeaderFuture.complete(Tuple2.of(leaderAddress, leaderSessionID));
+ }
+ catch (Exception e) {
+ handleError(e);
+ }
+ }
+ }
+
+ @Override
+ public void handleError(Exception exception) {
+ log.error("Received error from LeaderRetrievalService.", exception);
+
+ leaderFuture.completeExceptionally(exception);
+ }
+
+ protected CompletableFuture<Tuple2<String, UUID>> createNewFuture() {
+ return new CompletableFuture<>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java
index 027b42a..121387b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java
@@ -19,13 +19,14 @@
package org.apache.flink.runtime.webmonitor.retriever.impl;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.util.Preconditions;
import akka.actor.ActorRef;
@@ -35,9 +36,9 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
/**
- * {@link JobManagerRetriever} implementation for Akka based JobManagers.
+ * {@link LeaderGatewayRetriever} implementation for Akka based JobManagers.
*/
-public class AkkaJobManagerRetriever extends JobManagerRetriever {
+public class AkkaJobManagerRetriever extends LeaderGatewayRetriever<JobManagerGateway> {
private final ActorSystem actorSystem;
private final Time timeout;
@@ -51,19 +52,21 @@ public class AkkaJobManagerRetriever extends JobManagerRetriever {
}
@Override
- protected CompletableFuture<JobManagerGateway> createJobManagerGateway(String leaderAddress, UUID leaderId) throws Exception {
- return FutureUtils.toJava(
- AkkaUtils.getActorRefFuture(
- leaderAddress,
- actorSystem,
- FutureUtils.toFiniteDuration(timeout)))
- .thenApplyAsync(
- (ActorRef jobManagerRef) -> {
- ActorGateway leaderGateway = new AkkaActorGateway(
- jobManagerRef, leaderId);
+ protected CompletableFuture<JobManagerGateway> createGateway(CompletableFuture<Tuple2<String, UUID>> leaderFuture) {
+ return leaderFuture.thenCompose(
+ (Tuple2<String, UUID> addressLeaderId) ->
+ FutureUtils.toJava(
+ AkkaUtils.getActorRefFuture(
+ addressLeaderId.f0,
+ actorSystem,
+ FutureUtils.toFiniteDuration(timeout)))
+ .thenApplyAsync(
+ (ActorRef jobManagerRef) -> {
+ ActorGateway leaderGateway = new AkkaActorGateway(
+ jobManagerRef, addressLeaderId.f1);
- return new AkkaJobManagerGateway(leaderGateway);
- },
- actorSystem.dispatcher());
+ return new AkkaJobManagerGateway(leaderGateway);
+ }
+ ));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetriever.java
new file mode 100644
index 0000000..86afc63
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetriever.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link LeaderGatewayRetriever} implementation using the {@link RpcService}.
+ *
+ * @param <T> type of the gateway to retrieve
+ */
+public class RpcGatewayRetriever<T extends RpcGateway> extends LeaderGatewayRetriever<T> {
+
+ private final RpcService rpcService;
+ private final Class<T> gatewayType;
+
+ public RpcGatewayRetriever(RpcService rpcService, Class<T> gatewayType) {
+ this.rpcService = Preconditions.checkNotNull(rpcService);
+ this.gatewayType = Preconditions.checkNotNull(gatewayType);
+ }
+
+ @Override
+ protected CompletableFuture<T> createGateway(CompletableFuture<Tuple2<String, UUID>> leaderFuture) {
+ return leaderFuture.thenCompose(
+ (Tuple2<String, UUID> addressLeaderTuple) -> rpcService.connect(addressLeaderTuple.f0, gatewayType));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
deleted file mode 100644
index e608aa0..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.retriever.impl;
-
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
-import org.apache.flink.util.Preconditions;
-
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * JobManagerRetriever implementation for Flip-6 JobManager.
- */
-public class RpcJobManagerRetriever extends JobManagerRetriever {
-
- private final RpcService rpcService;
-
- public RpcJobManagerRetriever(
- RpcService rpcService) {
-
- this.rpcService = Preconditions.checkNotNull(rpcService);
- }
-
- @Override
- protected CompletableFuture<JobManagerGateway> createJobManagerGateway(String leaderAddress, UUID leaderId) throws Exception {
- return rpcService.connect(leaderAddress, JobManagerGateway.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
new file mode 100644
index 0000000..d02f3ef
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClientActorTest;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Test for the {@link AkkaJobManagerRetriever}.
+ */
+public class AkkaJobManagerRetrieverTest extends TestLogger {
+
+ private static final Time timeout = Time.seconds(10L);
+ private static ActorSystem actorSystem;
+
+ @BeforeClass
+ public static void setup() {
+ actorSystem = AkkaUtils.createDefaultActorSystem();
+ }
+
+ @AfterClass
+ public static void teardown() {
+ if (actorSystem != null) {
+ actorSystem.shutdown();
+ actorSystem.awaitTermination(FutureUtils.toFiniteDuration(timeout));
+
+ actorSystem = null;
+ }
+ }
+
+ /**
+ * Tests that we can retrieve the current leading job manager.
+ */
+ @Test
+ public void testAkkaJobManagerRetrieval() throws Exception {
+ AkkaJobManagerRetriever akkaJobManagerRetriever = new AkkaJobManagerRetriever(actorSystem, timeout);
+ TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService();
+
+ CompletableFuture<JobManagerGateway> gatewayFuture = akkaJobManagerRetriever.getFuture();
+ final UUID leaderSessionId = UUID.randomUUID();
+
+ ActorRef actorRef = null;
+
+ try {
+ actorRef = actorSystem.actorOf(
+ Props.create(JobClientActorTest.PlainActor.class, leaderSessionId));
+
+ final String address = actorRef.path().toString();
+
+ testingLeaderRetrievalService.start(akkaJobManagerRetriever);
+
+ // check that the gateway future has not been completed since there is no leader yet
+ assertFalse(gatewayFuture.isDone());
+
+ testingLeaderRetrievalService.notifyListener(address, leaderSessionId);
+
+ JobManagerGateway jobManagerGateway = gatewayFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ assertEquals(address, jobManagerGateway.getAddress());
+ } finally {
+ testingLeaderRetrievalService.stop();
+
+ if (actorRef != null) {
+ TestingUtils.stopActorGracefully(actorRef);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad0d351/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
new file mode 100644
index 0000000..1ca3918
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Tests for the {@link RpcGatewayRetriever}.
+ */
+public class RpcGatewayRetrieverTest extends TestLogger {
+
+ private static final Time TIMEOUT = Time.seconds(10L);
+ private static TestingRpcService rpcService;
+
+ @BeforeClass
+ public static void setup() {
+ rpcService = new TestingRpcService();
+ }
+
+ @AfterClass
+ public static void teardown() throws InterruptedException, ExecutionException, TimeoutException {
+ if (rpcService != null) {
+ rpcService.stopService();
+ rpcService.getTerminationFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ rpcService = null;
+ }
+ }
+
+ /**
+ * Tests that the RpcGatewayRetriever can retrieve the specified gateway type from a leader retrieval service.
+ */
+ @Test
+ public void testRpcGatewayRetrieval() throws Exception {
+ final String expectedValue = "foobar";
+ final String expectedValue2 = "barfoo";
+ final UUID leaderSessionId = UUID.randomUUID();
+
+ RpcGatewayRetriever<DummyGateway> gatewayRetriever = new RpcGatewayRetriever<>(rpcService, DummyGateway.class);
+ TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService();
+ DummyRpcEndpoint dummyRpcEndpoint = new DummyRpcEndpoint(rpcService, "dummyRpcEndpoint1", expectedValue);
+ DummyRpcEndpoint dummyRpcEndpoint2 = new DummyRpcEndpoint(rpcService, "dummyRpcEndpoint2", expectedValue2);
+ rpcService.registerGateway(dummyRpcEndpoint.getAddress(), dummyRpcEndpoint.getSelfGateway(DummyGateway.class));
+ rpcService.registerGateway(dummyRpcEndpoint2.getAddress(), dummyRpcEndpoint2.getSelfGateway(DummyGateway.class));
+
+ try {
+ dummyRpcEndpoint.start();
+ dummyRpcEndpoint2.start();
+
+ testingLeaderRetrievalService.start(gatewayRetriever);
+
+ final CompletableFuture<DummyGateway> gatewayFuture = gatewayRetriever.getFuture();
+
+ assertFalse(gatewayFuture.isDone());
+
+ testingLeaderRetrievalService.notifyListener(dummyRpcEndpoint.getAddress(), leaderSessionId);
+
+ final DummyGateway dummyGateway = gatewayFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ assertEquals(dummyRpcEndpoint.getAddress(), dummyGateway.getAddress());
+ assertEquals(expectedValue, dummyGateway.foobar(TIMEOUT).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS));
+
+ // elect a new leader
+ testingLeaderRetrievalService.notifyListener(dummyRpcEndpoint2.getAddress(), leaderSessionId);
+
+ final CompletableFuture<DummyGateway> gatewayFuture2 = gatewayRetriever.getFuture();
+ final DummyGateway dummyGateway2 = gatewayFuture2.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ assertEquals(dummyRpcEndpoint2.getAddress(), dummyGateway2.getAddress());
+ assertEquals(expectedValue2, dummyGateway2.foobar(TIMEOUT).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS));
+ } finally {
+ dummyRpcEndpoint.shutDown();
+ dummyRpcEndpoint2.shutDown();
+ dummyRpcEndpoint.getTerminationFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+ dummyRpcEndpoint2.getTerminationFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * Testing RpcGateway.
+ */
+ public interface DummyGateway extends RpcGateway {
+ CompletableFuture<String> foobar(@RpcTimeout Time timeout);
+ }
+
+ static class DummyRpcEndpoint extends RpcEndpoint implements DummyGateway {
+
+ private final String value;
+
+ protected DummyRpcEndpoint(RpcService rpcService, String endpointId, String value) {
+ super(rpcService, endpointId);
+ this.value = value;
+ }
+
+ @Override
+ public CompletableFuture<String> foobar(Time timeout) {
+ return CompletableFuture.completedFuture(value);
+ }
+ }
+}