You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/11 11:48:58 UTC
[1/4] flink git commit: [FLINK-7381] [web] Decouple WebRuntimeMonitor
from ActorGateway
Repository: flink
Updated Branches:
refs/heads/master 00d5b6222 -> 9f790d3ef
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
new file mode 100644
index 0000000..e608aa0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.util.Preconditions;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * JobManagerRetriever implementation for Flip-6 JobManager.
+ */
+public class RpcJobManagerRetriever extends JobManagerRetriever {
+
+ private final RpcService rpcService;
+
+ public RpcJobManagerRetriever(
+ RpcService rpcService) {
+
+ this.rpcService = Preconditions.checkNotNull(rpcService);
+ }
+
+ @Override
+ protected CompletableFuture<JobManagerGateway> createJobManagerGateway(String leaderAddress, UUID leaderId) throws Exception {
+ return rpcService.connect(leaderAddress, JobManagerGateway.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index e490b48..1616a7b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -45,7 +45,7 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager
import org.apache.flink.runtime.clusterframework.messages._
import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.concurrent.{Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => FlinkExecutors}
import org.apache.flink.runtime.execution.SuppressRestartsException
import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager}
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -82,6 +82,7 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
import org.apache.flink.runtime.taskexecutor.TaskExecutor
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util._
+import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever}
import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
import org.apache.flink.util.{InstantiationUtil, NetUtils, SerializedThrowable}
@@ -2219,12 +2220,17 @@ object JobManager {
if (configuration.getInteger(WebOptions.PORT, 0) >= 0) {
LOG.info("Starting JobManager web frontend")
+ val timeout = FutureUtils.toTime(AkkaUtils.getTimeout(configuration))
+
// start the web frontend. we need to load this dynamically
// because it is not in the same project/dependencies
val webServer = WebMonitorUtils.startWebRuntimeMonitor(
configuration,
highAvailabilityServices,
- jobManagerSystem)
+ new AkkaJobManagerRetriever(jobManagerSystem, timeout),
+ new AkkaQueryServiceRetriever(jobManagerSystem, timeout),
+ timeout,
+ jobManagerSystem.dispatcher)
Option(webServer)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index bc323cc..831c026 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -32,7 +32,7 @@ import org.apache.flink.configuration._
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils}
import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
-import org.apache.flink.runtime.concurrent.{Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => FlinkExecutors}
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader
import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware}
+import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever}
import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
import org.apache.flink.util.NetUtils
import org.slf4j.LoggerFactory
@@ -389,6 +390,8 @@ abstract class FlinkMiniCluster(
config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) &&
config.getInteger(WebOptions.PORT, 0) >= 0) {
+ val flinkTimeout = FutureUtils.toTime(timeout)
+
LOG.info("Starting JobManger web frontend")
// start the new web frontend. we need to load this dynamically
// because it is not in the same project/dependencies
@@ -396,7 +399,10 @@ abstract class FlinkMiniCluster(
WebMonitorUtils.startWebRuntimeMonitor(
config,
highAvailabilityServices,
- actorSystem)
+ new AkkaJobManagerRetriever(actorSystem, flinkTimeout),
+ new AkkaQueryServiceRetriever(actorSystem, flinkTimeout),
+ flinkTimeout,
+ actorSystem.dispatcher)
)
webServer.foreach(_.start(jobManagerAkkaURL))
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 858bbbb..ddbb82d 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -62,6 +62,8 @@ object TestingUtils {
val TESTING_TIMEOUT = 1 minute
+ val TIMEOUT = Time.minutes(1L)
+
val DEFAULT_AKKA_ASK_TIMEOUT = "200 s"
def getDefaultTestingActorSystemConfigString: String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 88cc585..9130901 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -18,6 +18,7 @@
package org.apache.flink.yarn;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -44,6 +45,8 @@ import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
@@ -358,11 +361,16 @@ public class YarnApplicationMasterRunner {
// 2: the web monitor
LOG.debug("Starting Web Frontend");
+ Time webMonitorTimeout = Time.milliseconds(config.getLong(WebOptions.TIMEOUT));
+
webMonitor = BootstrapTools.startWebMonitorIfConfigured(
config,
highAvailabilityServices,
- actorSystem,
- jobManager,
+ new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout),
+ new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout),
+ webMonitorTimeout,
+ futureExecutor,
+ AkkaUtils.getAkkaURL(actorSystem, jobManager),
LOG);
String protocol = "http://";
[3/4] flink git commit: [FLINK-7381] [web] Decouple WebRuntimeMonitor
from ActorGateway
Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index d1aeea4..e2437e6 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -45,7 +45,7 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
index c8ec689..3526734 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
@@ -19,8 +19,9 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import java.util.Map;
@@ -33,17 +34,23 @@ public class JobStoppingHandler extends AbstractJsonRequestHandler {
private static final String JOB_STOPPING_REST_PATH = "/jobs/:jobid/stop";
private static final String JOB_STOPPING_YARN_REST_PATH = "/jobs/:jobid/yarn-stop";
+ private final Time timeout;
+
+ public JobStoppingHandler(Time timeout) {
+ this.timeout = Preconditions.checkNotNull(timeout);
+ }
+
@Override
public String[] getPaths() {
return new String[]{JOB_STOPPING_REST_PATH, JOB_STOPPING_YARN_REST_PATH};
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
try {
- JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
- if (jobManager != null) {
- jobManager.tell(new JobManagerMessages.StopJob(jobid));
+ JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+ if (jobManagerGateway != null) {
+ jobManagerGateway.stopJob(jobId, timeout);
return "{}";
}
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
index 8646df9..079be8f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -18,7 +18,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.webmonitor.NotFoundException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
@@ -40,16 +41,16 @@ public interface RequestHandler {
*
* @param pathParams The map of REST path parameters, decoded by the router.
* @param queryParams The map of query parameters.
- * @param jobManager The JobManager actor.
+ * @param jobManagerGateway to talk to the JobManager.
*
* @return The full http response.
*
* @throws Exception Handlers may forward exceptions. Exceptions of type
- * {@link org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404
+ * {@link NotFoundException} will cause a HTTP 404
* response with the exception message, other exceptions will cause a HTTP 500 response
* with the exception stack trace.
*/
- FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception;
+ FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception;
/**
* Returns an array of REST URL's under which this handler can be registered.
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 3562874..b7fee2d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -33,13 +33,11 @@ import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
@@ -62,7 +60,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
-import akka.dispatch.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,13 +71,10 @@ import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-
-import scala.Option;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
+import java.util.concurrent.Executor;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
@@ -115,9 +109,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
/** Indicates which log file should be displayed. */
private FileMode fileMode;
- private final ExecutionContextExecutor executor;
-
- private final Time timeTimeout;
+ private final Executor executor;
private final BlobView blobView;
@@ -129,9 +121,9 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
public TaskManagerLogHandler(
JobManagerRetriever retriever,
- ExecutionContextExecutor executor,
- scala.concurrent.Future<String> localJobManagerAddressPromise,
- FiniteDuration timeout,
+ Executor executor,
+ CompletableFuture<String> localJobManagerAddressPromise,
+ Time timeout,
FileMode fileMode,
Configuration config,
boolean httpsEnabled,
@@ -143,8 +135,6 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
this.fileMode = fileMode;
this.blobView = Preconditions.checkNotNull(blobView, "blobView");
-
- timeTimeout = Time.milliseconds(timeout.toMillis());
}
@Override
@@ -162,20 +152,18 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
* Response when running with leading JobManager.
*/
@Override
- protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed routed, final ActorGateway jobManager) {
+ protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed routed, final JobManagerGateway jobManagerGateway) {
if (cache == null) {
- scala.concurrent.Future<Object> portFuture = jobManager.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout);
- scala.concurrent.Future<BlobCache> cacheFuture = portFuture.map(new Mapper<Object, BlobCache>() {
- @Override
- public BlobCache checkedApply(Object result) throws IOException {
- Option<String> hostOption = jobManager.actor().path().address().host();
- String host = hostOption.isDefined() ? hostOption.get() : "localhost";
- int port = (int) result;
- return new BlobCache(new InetSocketAddress(host, port), config, blobView);
- }
- }, executor);
-
- cache = FutureUtils.toJava(cacheFuture);
+ CompletableFuture<Integer> blobPortFuture = jobManagerGateway.requestBlobServerPort(timeout);
+ cache = blobPortFuture.thenApplyAsync(
+ (Integer port) -> {
+ try {
+ return new BlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create BlobCache.", e);
+ }
+ },
+ executor);
}
final String taskManagerID = routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
@@ -185,22 +173,18 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
if (lastRequestPending.putIfAbsent(taskManagerID, true) == null) {
try {
InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
- scala.concurrent.Future<JobManagerMessages.TaskManagerInstance> scalaTaskManagerFuture = jobManager
- .ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout)
- .mapTo(ClassTag$.MODULE$.<JobManagerMessages.TaskManagerInstance>apply(JobManagerMessages.TaskManagerInstance.class));
-
- CompletableFuture<JobManagerMessages.TaskManagerInstance> taskManagerFuture = FutureUtils.toJava(scalaTaskManagerFuture);
+ CompletableFuture<Optional<Instance>> taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
CompletableFuture<BlobKey> blobKeyFuture = taskManagerFuture.thenCompose(
- taskManagerInstance -> {
- Instance taskManager = taskManagerInstance.instance().get();
-
+ (Optional<Instance> optTMInstance) -> {
+ Instance taskManagerInstance = optTMInstance.orElseThrow(
+ () -> new FlinkFutureException("Could not find instance with " + instanceID + '.'));
switch (fileMode) {
case LOG:
- return taskManager.getTaskManagerGateway().requestTaskManagerLog(timeTimeout);
+ return taskManagerInstance.getTaskManagerGateway().requestTaskManagerLog(timeout);
case STDOUT:
default:
- return taskManager.getTaskManagerGateway().requestTaskManagerStdout(timeTimeout);
+ return taskManagerInstance.getTaskManagerGateway().requestTaskManagerStdout(timeout);
}
}
);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index 6ad490e..a8ab7a3 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -18,12 +18,10 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.RegisteredTaskManagers;
-import org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
import org.apache.flink.util.StringUtils;
@@ -32,12 +30,12 @@ import com.fasterxml.jackson.core.JsonGenerator;
import java.io.StringWriter;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static java.util.Objects.requireNonNull;
@@ -51,11 +49,11 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler {
public static final String TASK_MANAGER_ID_KEY = "taskmanagerid";
- private final FiniteDuration timeout;
+ private final Time timeout;
private final MetricFetcher fetcher;
- public TaskManagersHandler(FiniteDuration timeout, MetricFetcher fetcher) {
+ public TaskManagersHandler(Time timeout, MetricFetcher fetcher) {
this.timeout = requireNonNull(timeout);
this.fetcher = fetcher;
}
@@ -66,9 +64,9 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
try {
- if (jobManager != null) {
+ if (jobManagerGateway != null) {
// whether one task manager's metrics are requested, or all task manager, we
// return them in an array. This avoids unnecessary code complexity.
// If only one task manager is requested, we only fetch one task manager metrics.
@@ -76,20 +74,21 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler {
if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
try {
InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
- Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
- TaskManagerInstance instance = (TaskManagerInstance) Await.result(future, timeout);
- if (instance.instance().nonEmpty()) {
- instances.add(instance.instance().get());
- }
+ CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
+
+ Optional<Instance> instance = tmInstanceFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ instance.ifPresent(instances::add);
}
// this means the id string was invalid. Keep the list empty.
catch (IllegalArgumentException e){
// do nothing.
}
} else {
- Future<Object> future = jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout);
- RegisteredTaskManagers taskManagers = (RegisteredTaskManagers) Await.result(future, timeout);
- instances.addAll(taskManagers.asJavaCollection());
+ CompletableFuture<Collection<Instance>> tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+ Collection<Instance> tmInstances = tmInstancesFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ instances.addAll(tmInstances);
}
StringWriter writer = new StringWriter();
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index f96e0c2..d116c56 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -25,8 +25,8 @@ import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.AbstractJobVertexRequestHandler;
@@ -71,8 +71,8 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
public String handleJsonRequest(
Map<String, String> pathParams,
Map<String, String> queryParams,
- ActorGateway jobManager) throws Exception {
- return super.handleJsonRequest(pathParams, queryParams, jobManager);
+ JobManagerGateway jobManagerGateway) throws Exception {
+ return super.handleJsonRequest(pathParams, queryParams, jobManagerGateway);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
index 94b135d..b95f2c4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.webmonitor.metrics;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
import org.apache.flink.util.Preconditions;
@@ -48,7 +48,7 @@ public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
fetcher.update();
String requestedMetricsList = queryParams.get("get");
return requestedMetricsList != null
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
index 95398b5..3af9c56 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
@@ -18,37 +18,29 @@
package org.apache.flink.runtime.webmonitor.metrics;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.Preconditions;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.OnFailure;
-import akka.dispatch.OnSuccess;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import scala.Option;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
@@ -61,20 +53,25 @@ import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.Metr
public class MetricFetcher {
private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class);
- private final ActorSystem actorSystem;
private final JobManagerRetriever retriever;
- private final ExecutionContext ctx;
- private final FiniteDuration timeout = new FiniteDuration(Duration.create(AkkaOptions.ASK_TIMEOUT.defaultValue()).toMillis(), TimeUnit.MILLISECONDS);
+ private final MetricQueryServiceRetriever queryServiceRetriever;
+ private final Executor executor;
+ private final Time timeout;
private MetricStore metrics = new MetricStore();
private MetricDumpDeserializer deserializer = new MetricDumpDeserializer();
private long lastUpdateTime;
- public MetricFetcher(ActorSystem actorSystem, JobManagerRetriever retriever, ExecutionContext ctx) {
- this.actorSystem = Preconditions.checkNotNull(actorSystem);
+ public MetricFetcher(
+ JobManagerRetriever retriever,
+ MetricQueryServiceRetriever queryServiceRetriever,
+ Executor executor,
+ Time timeout) {
this.retriever = Preconditions.checkNotNull(retriever);
- this.ctx = Preconditions.checkNotNull(ctx);
+ this.queryServiceRetriever = Preconditions.checkNotNull(queryServiceRetriever);
+ this.executor = Preconditions.checkNotNull(executor);
+ this.timeout = Preconditions.checkNotNull(timeout);
}
/**
@@ -101,38 +98,38 @@ public class MetricFetcher {
private void fetchMetrics() {
try {
- Option<scala.Tuple2<ActorGateway, Integer>> jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
- if (jobManagerGatewayAndWebPort.isDefined()) {
- ActorGateway jobManager = jobManagerGatewayAndWebPort.get()._1();
+ Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
+ if (optJobManagerGateway.isPresent()) {
+ final JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
/**
* Remove all metrics that belong to a job that is not running and no longer archived.
*/
- Future<Object> jobDetailsFuture = jobManager.ask(new RequestJobDetails(true, true), timeout);
- jobDetailsFuture
- .onSuccess(new OnSuccess<Object>() {
- @Override
- public void onSuccess(Object result) throws Throwable {
- MultipleJobsDetails details = (MultipleJobsDetails) result;
+ CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
+
+ jobDetailsFuture.whenCompleteAsync(
+ (MultipleJobsDetails jobDetails, Throwable throwable) -> {
+ if (throwable != null) {
+ LOG.debug("Fetching of JobDetails failed.", throwable);
+ } else {
ArrayList<String> toRetain = new ArrayList<>();
- for (JobDetails job : details.getRunningJobs()) {
+ for (JobDetails job : jobDetails.getRunningJobs()) {
toRetain.add(job.getJobId().toString());
}
- for (JobDetails job : details.getFinishedJobs()) {
+ for (JobDetails job : jobDetails.getFinishedJobs()) {
toRetain.add(job.getJobId().toString());
}
synchronized (metrics) {
metrics.jobs.keySet().retainAll(toRetain);
}
}
- }, ctx);
- logErrorOnFailure(jobDetailsFuture, "Fetching of JobDetails failed.");
+ },
+ executor);
- String jobManagerPath = jobManager.path();
- String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
- ActorRef jobManagerQueryService = actorSystem.actorFor(queryServicePath);
+ String jobManagerPath = jobManagerGateway.getAddress();
+ String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
- queryMetrics(jobManagerQueryService);
+ retrieveAndQueryMetrics(jmQueryServicePath);
/**
* We first request the list of all registered task managers from the job manager, and then
@@ -140,88 +137,75 @@ public class MetricFetcher {
*
* <p>All stored metrics that do not belong to a registered task manager will be removed.
*/
- Future<Object> registeredTaskManagersFuture = jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout);
- registeredTaskManagersFuture
- .onSuccess(new OnSuccess<Object>() {
- @Override
- public void onSuccess(Object result) throws Throwable {
- Iterable<Instance> taskManagers = ((JobManagerMessages.RegisteredTaskManagers) result).asJavaIterable();
- List<String> activeTaskManagers = new ArrayList<>();
- for (Instance taskManager : taskManagers) {
- activeTaskManagers.add(taskManager.getId().toString());
-
- String taskManagerPath = taskManager.getTaskManagerGateway().getAddress();
- String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManager.getTaskManagerID().getResourceIdString();
- ActorRef taskManagerQueryService = actorSystem.actorFor(queryServicePath);
-
- queryMetrics(taskManagerQueryService);
- }
- synchronized (metrics) { // remove all metrics belonging to unregistered task managers
+ CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+ taskManagersFuture.whenCompleteAsync(
+ (Collection<Instance> taskManagers, Throwable throwable) -> {
+ if (throwable != null) {
+ LOG.debug("Fetching list of registered TaskManagers failed.", throwable);
+ } else {
+ List<String> activeTaskManagers = taskManagers.stream().map(
+ taskManagerInstance -> {
+ final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress();
+ final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString();
+
+ retrieveAndQueryMetrics(tmQueryServicePath);
+
+ return taskManagerInstance.getId().toString();
+ }).collect(Collectors.toList());
+
+ synchronized (metrics) {
metrics.taskManagers.keySet().retainAll(activeTaskManagers);
}
}
- }, ctx);
- logErrorOnFailure(registeredTaskManagersFuture, "Fetchin list of registered TaskManagers failed.");
+ },
+ executor);
}
} catch (Exception e) {
LOG.warn("Exception while fetching metrics.", e);
}
}
- private void logErrorOnFailure(Future<Object> future, final String message) {
- future.onFailure(new OnFailure() {
- @Override
- public void onFailure(Throwable failure) throws Throwable {
- LOG.debug(message, failure);
- }
- }, ctx);
- }
-
/**
- * Requests a metric dump from the given actor.
+ * Retrieves and queries the specified QueryServiceGateway.
*
- * @param actor ActorRef to request the dump from
- */
- private void queryMetrics(ActorRef actor) {
- Future<Object> metricQueryFuture = new BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout);
- metricQueryFuture
- .onSuccess(new OnSuccess<Object>() {
- @Override
- public void onSuccess(Object result) throws Throwable {
- addMetrics(result);
+ * @param queryServicePath specifying the QueryServiceGateway
+ */
+ private void retrieveAndQueryMetrics(String queryServicePath) {
+ final CompletableFuture<MetricQueryServiceGateway> queryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath);
+
+ queryServiceGatewayFuture.whenCompleteAsync(
+ (MetricQueryServiceGateway queryServiceGateway, Throwable t) -> {
+ if (t != null) {
+ LOG.debug("Could not retrieve QueryServiceGateway.", t);
+ } else {
+ queryMetrics(queryServiceGateway);
}
- }, ctx);
- logErrorOnFailure(metricQueryFuture, "Fetching metrics failed.");
- }
-
- private void addMetrics(Object result) {
- MetricDumpSerialization.MetricSerializationResult data = (MetricDumpSerialization.MetricSerializationResult) result;
- List<MetricDump> dumpedMetrics = deserializer.deserialize(data);
- for (MetricDump metric : dumpedMetrics) {
- metrics.add(metric);
- }
+ },
+ executor);
}
/**
- * Helper class that allows mocking of the answer.
- */
- static class BasicGateway {
- private final ActorRef actor;
-
- private BasicGateway(ActorRef actor) {
- this.actor = actor;
- }
-
- /**
- * Sends a message asynchronously and returns its response. The response to the message is
- * returned as a future.
- *
- * @param message Message to be sent
- * @param timeout Timeout until the Future is completed with an AskTimeoutException
- * @return Future which contains the response to the sent message
- */
- public Future<Object> ask(Object message, FiniteDuration timeout) {
- return Patterns.ask(actor, message, new Timeout(timeout));
- }
+ * Query the metrics from the given QueryServiceGateway.
+ *
+ * @param queryServiceGateway to query for metrics
+ */
+ private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
+ queryServiceGateway
+ .queryMetrics(timeout)
+ .whenCompleteAsync(
+ (MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
+ if (t != null) {
+ LOG.debug("Fetching metrics failed.", t);
+ } else {
+ List<MetricDump> dumpedMetrics = deserializer.deserialize(result);
+ synchronized (metrics) {
+ for (MetricDump metric : dumpedMetrics) {
+ metrics.add(metric);
+ }
+ }
+ }
+ },
+ executor);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index 7cd2932..b3ce135 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -158,25 +158,20 @@ public class WebFrontendITCase extends TestLogger {
}
@Test
- public void getTaskmanagers() {
- try {
- String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+ public void getTaskmanagers() throws Exception {
+ String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
- ObjectMapper mapper = new ObjectMapper();
- JsonNode parsed = mapper.readTree(json);
- ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode parsed = mapper.readTree(json);
+ ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
- assertNotNull(taskManagers);
- assertEquals(cluster.numTaskManagers(), taskManagers.size());
+ assertNotNull(taskManagers);
+ assertEquals(cluster.numTaskManagers(), taskManagers.size());
- JsonNode taskManager = taskManagers.get(0);
- assertNotNull(taskManager);
- assertEquals(NUM_SLOTS, taskManager.get("slotsNumber").asInt());
- assertTrue(taskManager.get("freeSlots").asInt() <= NUM_SLOTS);
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ JsonNode taskManager = taskManagers.get(0);
+ assertNotNull(taskManager);
+ assertEquals(NUM_SLOTS, taskManager.get("slotsNumber").asInt());
+ assertTrue(taskManager.get("freeSlots").asInt() <= NUM_SLOTS);
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index fe16445..5829d1c 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.WebOptions;
@@ -27,12 +28,15 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.webmonitor.files.MimeTypes;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
import org.apache.flink.util.TestLogger;
@@ -44,11 +48,12 @@ import org.apache.curator.test.TestingServer;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import org.powermock.reflect.Whitebox;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Objects;
+import java.util.Optional;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -71,7 +76,9 @@ public class WebRuntimeMonitorITCase extends TestLogger {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
- private static final FiniteDuration TestTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(2L, TimeUnit.MINUTES);
+
+ private static final Time TIMEOUT = Time.milliseconds(TEST_TIMEOUT.toMillis());
private final String mainResourcesPath = getClass().getResource("/web").getPath();
@@ -80,7 +87,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
*/
@Test
public void testStandaloneWebRuntimeMonitor() throws Exception {
- final Deadline deadline = TestTimeout.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
TestingCluster flink = null;
WebRuntimeMonitor webMonitor = null;
@@ -89,7 +96,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
// Flink w/o a web monitor
flink = new TestingCluster(new Configuration());
flink.start(true);
- webMonitor = startWebRuntimeMonitor(flink);
+ webMonitor = startWebRuntimeMonitor(flink, TIMEOUT);
try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
String expected = new Scanner(new File(mainResourcesPath + "/index.html"))
@@ -129,10 +136,11 @@ public class WebRuntimeMonitorITCase extends TestLogger {
*/
@Test
public void testRedirectToLeader() throws Exception {
- final Deadline deadline = TestTimeout.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
ActorSystem[] jobManagerSystem = new ActorSystem[2];
WebRuntimeMonitor[] webMonitor = new WebRuntimeMonitor[2];
+ AkkaJobManagerRetriever[] jobManagerRetrievers = new AkkaJobManagerRetriever[2];
HighAvailabilityServices highAvailabilityServices = null;
try (TestingServer zooKeeper = new TestingServer()) {
@@ -157,11 +165,16 @@ public class WebRuntimeMonitorITCase extends TestLogger {
}
for (int i = 0; i < webMonitor.length; i++) {
+ jobManagerRetrievers[i] = new AkkaJobManagerRetriever(jobManagerSystem[i], TIMEOUT);
+
webMonitor[i] = new WebRuntimeMonitor(
config,
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
highAvailabilityServices.createBlobStore(),
- jobManagerSystem[i]);
+ jobManagerRetrievers[i],
+ new AkkaQueryServiceRetriever(jobManagerSystem[i], TIMEOUT),
+ TIMEOUT,
+ TestingUtils.defaultExecutor());
}
ActorRef[] jobManager = new ActorRef[2];
@@ -196,22 +209,18 @@ public class WebRuntimeMonitorITCase extends TestLogger {
int followerIndex = (leaderIndex + 1) % 2;
ActorSystem leadingSystem = jobManagerSystem[leaderIndex];
- ActorSystem followerSystem = jobManagerSystem[followerIndex];
WebMonitor leadingWebMonitor = webMonitor[leaderIndex];
WebMonitor followerWebMonitor = webMonitor[followerIndex];
// For test stability reason we have to wait until we are sure that both leader
// listeners have been notified.
- JobManagerRetriever leadingRetriever = Whitebox
- .getInternalState(leadingWebMonitor, "retriever");
-
- JobManagerRetriever followerRetriever = Whitebox
- .getInternalState(followerWebMonitor, "retriever");
+ AkkaJobManagerRetriever leadingRetriever = jobManagerRetrievers[leaderIndex];
+ AkkaJobManagerRetriever followerRetriever = jobManagerRetrievers[followerIndex];
// Wait for the initial notifications
- waitForLeaderNotification(leadingSystem, jobManager[leaderIndex], leadingRetriever, deadline);
- waitForLeaderNotification(leadingSystem, jobManager[leaderIndex], followerRetriever, deadline);
+ waitForLeaderNotification(jobManager[leaderIndex].path().toString(), leadingRetriever, deadline);
+ waitForLeaderNotification(AkkaUtils.getAkkaURL(leadingSystem, jobManager[leaderIndex]), followerRetriever, deadline);
try (
HttpTestClient leaderClient = new HttpTestClient(
@@ -241,7 +250,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
leadingSystem.shutdown();
// Wait for the notification of the follower
- waitForLeaderNotification(followerSystem, jobManager[followerIndex], followerRetriever, deadline);
+ waitForLeaderNotification(jobManager[followerIndex].path().toString(), followerRetriever, deadline);
// Same request to the new leader
followingClient.sendGetRequest("index.html", deadline.timeLeft());
@@ -282,7 +291,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
@Test
public void testLeaderNotAvailable() throws Exception {
- final Deadline deadline = TestTimeout.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
ActorSystem actorSystem = null;
WebRuntimeMonitor webRuntimeMonitor = null;
@@ -305,7 +314,10 @@ public class WebRuntimeMonitorITCase extends TestLogger {
config,
mock(LeaderRetrievalService.class),
mock(BlobView.class),
- actorSystem);
+ new AkkaJobManagerRetriever(actorSystem, TIMEOUT),
+ new AkkaQueryServiceRetriever(actorSystem, TIMEOUT),
+ TIMEOUT,
+ TestingUtils.defaultExecutor());
webRuntimeMonitor.start("akka://schmakka");
@@ -343,7 +355,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
*/
@Test
public void testNoEscape() throws Exception {
- final Deadline deadline = TestTimeout.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
TestingCluster flink = null;
WebRuntimeMonitor webMonitor = null;
@@ -351,7 +363,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
try {
flink = new TestingCluster(new Configuration());
flink.start(true);
- webMonitor = startWebRuntimeMonitor(flink);
+ webMonitor = startWebRuntimeMonitor(flink, TIMEOUT);
try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
String expectedIndex = new Scanner(new File(mainResourcesPath + "/index.html"))
@@ -405,7 +417,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
*/
@Test
public void testNoCopyFromJar() throws Exception {
- final Deadline deadline = TestTimeout.fromNow();
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
TestingCluster flink = null;
WebRuntimeMonitor webMonitor = null;
@@ -413,7 +425,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
try {
flink = new TestingCluster(new Configuration());
flink.start(true);
- webMonitor = startWebRuntimeMonitor(flink);
+ webMonitor = startWebRuntimeMonitor(flink, TIMEOUT);
try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
String expectedIndex = new Scanner(new File(mainResourcesPath + "/index.html"))
@@ -459,7 +471,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
}
private WebRuntimeMonitor startWebRuntimeMonitor(
- TestingCluster flink) throws Exception {
+ TestingCluster flink,
+ Time timeout) throws Exception {
ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
ActorRef jmActor = flink.jobManagerActors().get().head();
@@ -482,7 +495,10 @@ public class WebRuntimeMonitorITCase extends TestLogger {
config,
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
highAvailabilityServices.createBlobStore(),
- jmActorSystem);
+ new AkkaJobManagerRetriever(jmActorSystem, timeout),
+ new AkkaQueryServiceRetriever(jmActorSystem, timeout),
+ timeout,
+ TestingUtils.defaultExecutor());
webMonitor.start(jobManagerAddress);
flink.waitForActorsToBeAlive();
@@ -492,17 +508,14 @@ public class WebRuntimeMonitorITCase extends TestLogger {
// ------------------------------------------------------------------------
private void waitForLeaderNotification(
- ActorSystem system,
- ActorRef expectedLeader,
- JobManagerRetriever retriever,
+ String expectedJobManagerURL,
+ AkkaJobManagerRetriever retriever,
Deadline deadline) throws Exception {
- String expectedJobManagerUrl = AkkaUtils.getAkkaURL(system, expectedLeader);
-
while (deadline.hasTimeLeft()) {
- ActorRef leaderRef = retriever.awaitJobManagerGatewayAndWebPort()._1().actor();
+ Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
- if (AkkaUtils.getAkkaURL(system, leaderRef).equals(expectedJobManagerUrl)) {
+ if (optJobManagerGateway.isPresent() && Objects.equals(expectedJobManagerURL, optJobManagerGateway.get().getAddress())) {
return;
}
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
index 19e8a49..865385f 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
@@ -18,20 +18,18 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.api.common.time.Time;
+
import org.junit.Assert;
import org.junit.Test;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
-
/**
* Tests for the ClusterOverviewHandler.
*/
public class ClusterOverviewHandlerTest {
@Test
public void testGetPaths() {
- ClusterOverviewHandler handler = new ClusterOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS));
+ ClusterOverviewHandler handler = new ClusterOverviewHandler(Time.seconds(0L));
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/overview", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
index e108774..ea26f5d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
@@ -18,20 +18,18 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.api.common.time.Time;
+
import org.junit.Assert;
import org.junit.Test;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
-
/**
* Tests for the CurrentJobIdsHandler.
*/
public class CurrentJobIdsHandlerTest {
@Test
public void testGetPaths() {
- CurrentJobIdsHandler handler = new CurrentJobIdsHandler(new FiniteDuration(0, TimeUnit.SECONDS));
+ CurrentJobIdsHandler handler = new CurrentJobIdsHandler(Time.seconds(0L));
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
index 9f3d362..64360d3 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
@@ -35,9 +36,6 @@ import org.junit.Test;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
/**
* Tests for the CurrentJobsOverviewHandler.
@@ -68,17 +66,17 @@ public class CurrentJobsOverviewHandlerTest {
@Test
public void testGetPaths() {
- CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, true);
+ CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(Time.seconds(0L), true, true);
String[] pathsAll = handlerAll.getPaths();
Assert.assertEquals(1, pathsAll.length);
Assert.assertEquals("/joboverview", pathsAll[0]);
- CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, false);
+ CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(Time.seconds(0L), true, false);
String[] pathsRunning = handlerRunning.getPaths();
Assert.assertEquals(1, pathsRunning.length);
Assert.assertEquals("/joboverview/running", pathsRunning[0]);
- CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), false, true);
+ CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(Time.seconds(0L), false, true);
String[] pathsCompleted = handlerCompleted.getPaths();
Assert.assertEquals(1, pathsCompleted.length);
Assert.assertEquals("/joboverview/completed", pathsCompleted[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
index 4ddddca..ac8d934 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
@@ -18,41 +18,54 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.DummyActorGateway;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
-import scala.Tuple2;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Tests for the HandlerRedirectUtils.
*/
-public class HandlerRedirectUtilsTest {
+public class HandlerRedirectUtilsTest extends TestLogger {
private static final String localJobManagerAddress = "akka.tcp://flink@127.0.0.1:1234/user/foobar";
- private static final String remoteURL = "127.0.0.2:1235";
+ private static final String remoteHostname = "127.0.0.2";
+ private static final int webPort = 1235;
+ private static final String remoteURL = remoteHostname + ':' + webPort;
private static final String remotePath = "akka.tcp://flink@" + remoteURL + "/user/jobmanager";
@Test
public void testGetRedirectAddressWithLocalAkkaPath() throws Exception {
- ActorGateway leaderGateway = new DummyActorGateway("akka://flink/user/foobar");
-
- Tuple2<ActorGateway, Integer> leader = new Tuple2<>(leaderGateway, 1235);
+ JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
+ when(jobManagerGateway.getAddress()).thenReturn("akka://flink/user/foobar");
- String redirectingAddress = HandlerRedirectUtils.getRedirectAddress(localJobManagerAddress, leader);
+ String redirectingAddress = HandlerRedirectUtils.getRedirectAddress(
+ localJobManagerAddress,
+ jobManagerGateway,
+ Time.seconds(3L));
Assert.assertNull(redirectingAddress);
}
@Test
public void testGetRedirectAddressWithRemoteAkkaPath() throws Exception {
- ActorGateway leaderGateway = new DummyActorGateway(remotePath);
-
- Tuple2<ActorGateway, Integer> leader = new Tuple2<>(leaderGateway, 1235);
+ JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
+ when(jobManagerGateway.getAddress()).thenReturn(remotePath);
+ when(jobManagerGateway.getHostname()).thenReturn(remoteHostname);
+ when(jobManagerGateway.requestWebPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(webPort));
- String redirectingAddress = HandlerRedirectUtils.getRedirectAddress(localJobManagerAddress, leader);
+ String redirectingAddress = HandlerRedirectUtils.getRedirectAddress(
+ localJobManagerAddress,
+ jobManagerGateway,
+ Time.seconds(3L));
Assert.assertEquals(remoteURL, redirectingAddress);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
index fcbfa02..82aa87a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -18,6 +18,9 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+
import org.junit.Assert;
import org.junit.Test;
@@ -27,7 +30,7 @@ import org.junit.Test;
public class JarRunHandlerTest {
@Test
public void testGetPaths() {
- JarRunHandler handler = new JarRunHandler(null, null, null);
+ JarRunHandler handler = new JarRunHandler(null, Time.seconds(0L), new Configuration());
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jars/:jarid/run", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
index 25fca9b..fe55f51 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -31,6 +32,8 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
+import static org.mockito.Mockito.mock;
+
/**
* Tests for the JobAccumulatorsHandler.
*/
@@ -51,7 +54,7 @@ public class JobAccumulatorsHandlerTest {
@Test
public void testGetPaths() {
- JobAccumulatorsHandler handler = new JobAccumulatorsHandler(null);
+ JobAccumulatorsHandler handler = new JobAccumulatorsHandler(mock(ExecutionGraphHolder.class));
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
index ed54000..98d9353 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;
@@ -30,7 +32,7 @@ import java.util.List;
public class JobCancellationHandlerTest {
@Test
public void testGetPaths() {
- JobCancellationHandler handler = new JobCancellationHandler();
+ JobCancellationHandler handler = new JobCancellationHandler(TestingUtils.TIMEOUT());
String[] paths = handler.getPaths();
Assert.assertEquals(2, paths.length);
List<String> pathsList = Lists.newArrayList(paths);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
index 64a07c8..b48ee66 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
@@ -19,21 +19,20 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import akka.dispatch.ExecutionContexts$;
-import akka.dispatch.Futures;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Assert;
@@ -45,15 +44,14 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-import scala.concurrent.impl.Promise;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -62,13 +60,13 @@ import static org.mockito.Mockito.when;
/**
* Tests for the JobCancellationWithSavepointHandler.
*/
-public class JobCancellationWithSavepointHandlersTest {
+public class JobCancellationWithSavepointHandlersTest extends TestLogger {
- private static final ExecutionContext EC = ExecutionContexts$.MODULE$.fromExecutor(Executors.directExecutor());
+ private static final Executor executor = Executors.directExecutor();
@Test
public void testGetPaths() {
- JobCancellationWithSavepointHandlers handler = new JobCancellationWithSavepointHandlers(mock(ExecutionGraphHolder.class), EC);
+ JobCancellationWithSavepointHandlers handler = new JobCancellationWithSavepointHandlers(mock(ExecutionGraphHolder.class), executor);
JobCancellationWithSavepointHandlers.TriggerHandler triggerHandler = handler.getTriggerHandler();
String[] triggerPaths = triggerHandler.getPaths();
@@ -94,25 +92,23 @@ public class JobCancellationWithSavepointHandlersTest {
ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
ExecutionGraph graph = mock(ExecutionGraph.class);
CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
- when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph);
+ when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
when(graph.getCheckpointCoordinator()).thenReturn(coord);
when(coord.getCheckpointTimeout()).thenReturn(timeout);
- JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC);
+ JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler();
Map<String, String> params = new HashMap<>();
params.put("jobid", jobId.toString());
params.put("targetDirectory", "placeholder");
- ActorGateway jobManager = mock(ActorGateway.class);
-
- Future<Object> future = Futures.successful((Object) new CancellationSuccess(jobId, null));
- when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(future);
+ JobManagerGateway jobManager = mock(JobManagerGateway.class);
+ when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar"));
- handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+ handler.handleRequest(params, Collections.emptyMap(), jobManager);
- verify(jobManager).ask(any(CancelJobWithSavepoint.class), eq(FiniteDuration.apply(timeout, "ms")));
+ verify(jobManager).cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class));
}
/**
@@ -125,36 +121,34 @@ public class JobCancellationWithSavepointHandlersTest {
ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
ExecutionGraph graph = mock(ExecutionGraph.class);
CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
- when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph);
+ when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
when(graph.getCheckpointCoordinator()).thenReturn(coord);
when(coord.getCheckpointTimeout()).thenReturn(timeout);
- JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC, "the-default-directory");
+ JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor, "the-default-directory");
JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler();
Map<String, String> params = new HashMap<>();
params.put("jobid", jobId.toString());
- ActorGateway jobManager = mock(ActorGateway.class);
-
- Future<Object> future = Futures.successful((Object) new CancellationSuccess(jobId, null));
- when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(future);
+ JobManagerGateway jobManager = mock(JobManagerGateway.class);
+ when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar"));
// 1. Use targetDirectory path param
params.put("targetDirectory", "custom-directory");
handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
- verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), eq(FiniteDuration.apply(timeout, "ms")));
+ verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
// 2. Use default
params.remove("targetDirectory");
handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
- verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "the-default-directory")), eq(FiniteDuration.apply(timeout, "ms")));
+ verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("the-default-directory"), any(Time.class));
// 3. Throw Exception
- handlers = new JobCancellationWithSavepointHandlers(holder, EC, null);
+ handlers = new JobCancellationWithSavepointHandlers(holder, executor, null);
handler = handlers.getTriggerHandler();
try {
@@ -175,10 +169,10 @@ public class JobCancellationWithSavepointHandlersTest {
ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
ExecutionGraph graph = mock(ExecutionGraph.class);
CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
- when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph);
+ when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
when(graph.getCheckpointCoordinator()).thenReturn(coord);
- JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC);
+ JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler();
JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler();
@@ -186,16 +180,16 @@ public class JobCancellationWithSavepointHandlersTest {
params.put("jobid", jobId.toString());
params.put("targetDirectory", "custom-directory");
- ActorGateway jobManager = mock(ActorGateway.class);
+ JobManagerGateway jobManager = mock(JobManagerGateway.class);
// Successful
- Promise<Object> promise = new Promise.DefaultPromise<>();
- when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(promise);
+ CompletableFuture<String> successfulCancelWithSavepoint = new CompletableFuture<>();
+ when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(successfulCancelWithSavepoint);
// Trigger
FullHttpResponse response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
- verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), any(FiniteDuration.class));
+ verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
String location = String.format("/jobs/%s/cancel-with-savepoint/in-progress/1", jobId);
@@ -226,7 +220,7 @@ public class JobCancellationWithSavepointHandlersTest {
assertEquals(location, root.get("location").asText());
// Only single actual request
- verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), any(FiniteDuration.class));
+ verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
// Query progress
params.put("requestId", "1");
@@ -243,7 +237,7 @@ public class JobCancellationWithSavepointHandlersTest {
assertEquals("1", root.get("request-id").asText());
// Complete
- promise.success(new CancellationSuccess(jobId, "_path-savepoint_"));
+ successfulCancelWithSavepoint.complete("_path-savepoint_");
response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
@@ -301,10 +295,10 @@ public class JobCancellationWithSavepointHandlersTest {
ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
ExecutionGraph graph = mock(ExecutionGraph.class);
CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
- when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph);
+ when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
when(graph.getCheckpointCoordinator()).thenReturn(coord);
- JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC);
+ JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler();
JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler();
@@ -312,15 +306,15 @@ public class JobCancellationWithSavepointHandlersTest {
params.put("jobid", jobId.toString());
params.put("targetDirectory", "custom-directory");
- ActorGateway jobManager = mock(ActorGateway.class);
+ JobManagerGateway jobManager = mock(JobManagerGateway.class);
// Successful
- Future<Object> future = Futures.failed(new Exception("Test Exception"));
- when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(future);
+ CompletableFuture<String> unsuccessfulCancelWithSavepoint = FutureUtils.completedExceptionally(new Exception("Test Exception"));
+ when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(unsuccessfulCancelWithSavepoint);
// Trigger
trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
- verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), any(FiniteDuration.class));
+ verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
// Query progress
params.put("requestId", "1");
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
index ad9da6b..104b0a3 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -32,6 +33,8 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Map;
+import static org.mockito.Mockito.mock;
+
/**
* Tests for the JobConfigHandler.
*/
@@ -52,7 +55,7 @@ public class JobConfigHandlerTest {
@Test
public void testGetPaths() {
- JobConfigHandler handler = new JobConfigHandler(null);
+ JobConfigHandler handler = new JobConfigHandler(mock(ExecutionGraphHolder.class));
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/config", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
index d830707..f3f5943 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -39,6 +40,8 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import static org.mockito.Mockito.mock;
+
/**
* Tests for the JobDetailsHandler.
*/
@@ -64,7 +67,7 @@ public class JobDetailsHandlerTest {
@Test
public void testGetPaths() {
- JobDetailsHandler handler = new JobDetailsHandler(null, null);
+ JobDetailsHandler handler = new JobDetailsHandler(mock(ExecutionGraphHolder.class), null);
String[] paths = handler.getPaths();
Assert.assertEquals(2, paths.length);
List<String> pathsList = Lists.newArrayList(paths);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
index 6016d01..f54ab06 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -35,6 +36,8 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
+import static org.mockito.Mockito.mock;
+
/**
* Tests for the JobExceptionsHandler.
*/
@@ -55,7 +58,7 @@ public class JobExceptionsHandlerTest {
@Test
public void testGetPaths() {
- JobExceptionsHandler handler = new JobExceptionsHandler(null);
+ JobExceptionsHandler handler = new JobExceptionsHandler(mock(ExecutionGraphHolder.class));
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
index a5ea2b3..17b4c44 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -28,6 +29,8 @@ import org.junit.Test;
import java.util.Collection;
+import static org.mockito.Mockito.mock;
+
/**
* Tests for the JobPlanHandler.
*/
@@ -48,7 +51,7 @@ public class JobPlanHandlerTest {
@Test
public void testGetPaths() {
- JobPlanHandler handler = new JobPlanHandler(null);
+ JobPlanHandler handler = new JobPlanHandler(mock(ExecutionGraphHolder.class));
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/plan", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
index cac0b10..ee47ee9 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
@@ -18,6 +18,9 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;
@@ -27,10 +30,10 @@ import java.util.List;
/**
* Tests for the JobStoppingHandler.
*/
-public class JobStoppingHandlerTest {
+public class JobStoppingHandlerTest extends TestLogger {
@Test
public void testGetPaths() {
- JobStoppingHandler handler = new JobStoppingHandler();
+ JobStoppingHandler handler = new JobStoppingHandler(TestingUtils.TIMEOUT());
String[] paths = handler.getPaths();
Assert.assertEquals(2, paths.length);
List<String> pathsList = Lists.newArrayList(paths);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
index c57aa09..b7af323 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -33,6 +34,8 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
+import static org.mockito.Mockito.mock;
+
/**
* Tests for the JobVertexAccumulatorsHandler.
*/
@@ -54,7 +57,7 @@ public class JobVertexAccumulatorsHandlerTest {
@Test
public void testGetPaths() {
- JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(null);
+ JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class));
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
index 8985d89..d2ac0d6 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
@@ -46,7 +46,7 @@ import static org.mockito.Mockito.when;
public class JobVertexBackPressureHandlerTest {
@Test
public void testGetPaths() {
- JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(null, mock(BackPressureStatsTracker.class), 0);
+ JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), mock(BackPressureStatsTracker.class), 0);
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/backpressure", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
index bde6a84..bc4fe9c 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -35,6 +36,8 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
+import static org.mockito.Mockito.mock;
+
/**
* Tests for the JobVertexDetailsHandler.
*/
@@ -56,7 +59,7 @@ public class JobVertexDetailsHandlerTest {
@Test
public void testGetPaths() {
- JobVertexDetailsHandler handler = new JobVertexDetailsHandler(null, null);
+ JobVertexDetailsHandler handler = new JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), null);
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
index 8954844..d5d877a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -36,6 +37,8 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
+import static org.mockito.Mockito.mock;
+
/**
* Tests for the JobVertexTaskManagersHandler.
*/
@@ -58,7 +61,7 @@ public class JobVertexTaskManagersHandlerTest {
@Test
public void testGetPaths() {
- JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(null, null);
+ JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), null);
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]);
[2/4] flink git commit: [FLINK-7381] [web] Decouple WebRuntimeMonitor
from ActorGateway
Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
index f419908..d992b85 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -18,16 +18,20 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
import org.junit.Assert;
import org.junit.Test;
+import static org.mockito.Mockito.mock;
+
/**
* Tests for the SubtaskCurrentAttemptDetailsHandler.
*/
public class SubtaskCurrentAttemptDetailsHandlerTest {
@Test
public void testGetPaths() {
- SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(null, null);
+ SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), null);
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index 74a19a9..ce8e72f 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -33,6 +34,8 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
+import static org.mockito.Mockito.mock;
+
/**
* Tests for the SubtaskExecutionAttemptAccumulatorsHandler.
*/
@@ -61,7 +64,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest {
@Test
public void testGetPaths() {
- SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(null);
+ SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class));
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
index a9161b3..e1fbf92 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -34,6 +35,8 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
+import static org.mockito.Mockito.mock;
+
/**
* Tests for the SubtaskExecutionAttemptDetailsHandler.
*/
@@ -70,7 +73,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest {
@Test
public void testGetPaths() {
- SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(null, null);
+ SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), null);
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
index 6022be2..f33da80 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -33,6 +34,8 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
+import static org.mockito.Mockito.mock;
+
/**
* Tests for the SubtasksAllAccumulatorsHandler.
*/
@@ -55,7 +58,7 @@ public class SubtasksAllAccumulatorsHandlerTest {
@Test
public void testGetPaths() {
- SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(null);
+ SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class));
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
index 22a2d27..548efaf 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -34,6 +35,8 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
+import static org.mockito.Mockito.mock;
+
/**
* Tests for the SubtasksTimesHandler.
*/
@@ -56,7 +59,7 @@ public class SubtasksTimesHandlerTest {
@Test
public void testGetPaths() {
- SubtasksTimesHandler handler = new SubtasksTimesHandler(null);
+ SubtasksTimesHandler handler = new SubtasksTimesHandler(mock(ExecutionGraphHolder.class));
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasktimes", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
index 5846d75..cf59f05 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
@@ -21,17 +21,16 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
@@ -46,19 +45,12 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
-import scala.Option;
-import scala.collection.JavaConverters;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.Future$;
-import scala.concurrent.duration.FiniteDuration;
-
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.isA;
import static org.powermock.api.mockito.PowerMockito.mock;
@@ -72,9 +64,9 @@ public class TaskManagerLogHandlerTest {
public void testGetPaths() {
TaskManagerLogHandler handlerLog = new TaskManagerLogHandler(
mock(JobManagerRetriever.class),
- mock(ExecutionContextExecutor.class),
- Future$.MODULE$.successful("/jm/address"),
- AkkaUtils.getDefaultClientTimeout(),
+ Executors.directExecutor(),
+ CompletableFuture.completedFuture("/jm/address"),
+ TestingUtils.TIMEOUT(),
TaskManagerLogHandler.FileMode.LOG,
new Configuration(),
false,
@@ -85,9 +77,9 @@ public class TaskManagerLogHandlerTest {
TaskManagerLogHandler handlerOut = new TaskManagerLogHandler(
mock(JobManagerRetriever.class),
- mock(ExecutionContextExecutor.class),
- Future$.MODULE$.successful("/jm/address"),
- AkkaUtils.getDefaultClientTimeout(),
+ Executors.directExecutor(),
+ CompletableFuture.completedFuture("/jm/address"),
+ TestingUtils.TIMEOUT(),
TaskManagerLogHandler.FileMode.STDOUT,
new Configuration(),
false,
@@ -115,27 +107,21 @@ public class TaskManagerLogHandlerTest {
// ========= setup JobManager ==================================================================================
- ActorGateway jobManagerGateway = mock(ActorGateway.class);
- Object registeredTaskManagersAnswer = new JobManagerMessages.RegisteredTaskManagers(
- JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala());
-
- when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class), any(FiniteDuration.class)))
- .thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer));
- when(jobManagerGateway.ask(isA(JobManagerMessages.getRequestBlobManagerPort().getClass()), any(FiniteDuration.class)))
- .thenReturn(Future$.MODULE$.successful((Object) 5));
- when(jobManagerGateway.ask(isA(JobManagerMessages.RequestTaskManagerInstance.class), any(FiniteDuration.class)))
- .thenReturn(Future$.MODULE$.successful((Object) new JobManagerMessages.TaskManagerInstance(Option.apply(taskManager))));
- when(jobManagerGateway.path()).thenReturn("/jm/address");
+ JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
+ when(jobManagerGateway.requestBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(1337));
+ when(jobManagerGateway.getHostname()).thenReturn("localhost");
+ when(jobManagerGateway.requestTaskManagerInstance(any(InstanceID.class), any(Time.class))).thenReturn(
+ CompletableFuture.completedFuture(Optional.of(taskManager)));
JobManagerRetriever retriever = mock(JobManagerRetriever.class);
- when(retriever.getJobManagerGatewayAndWebPort())
- .thenReturn(Option.apply(new scala.Tuple2<ActorGateway, Integer>(jobManagerGateway, 0)));
+ when(retriever.getJobManagerGatewayNow())
+ .thenReturn(Optional.of(jobManagerGateway));
TaskManagerLogHandler handler = new TaskManagerLogHandler(
retriever,
- ExecutionContext$.MODULE$.fromExecutor(Executors.directExecutor()),
- Future$.MODULE$.successful("/jm/address"),
- AkkaUtils.getDefaultClientTimeout(),
+ Executors.directExecutor(),
+ CompletableFuture.completedFuture("/jm/address"),
+ TestingUtils.TIMEOUT(),
TaskManagerLogHandler.FileMode.LOG,
new Configuration(),
false,
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
index 17e7e9d..2992d91 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
@@ -18,14 +18,13 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.api.common.time.Time;
+
import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
/**
* Tests for the TaskManagersHandler.
@@ -33,7 +32,7 @@ import scala.concurrent.duration.FiniteDuration;
public class TaskManagersHandlerTest {
@Test
public void testGetPaths() {
- TaskManagersHandler handler = new TaskManagersHandler(new FiniteDuration(0, TimeUnit.SECONDS), null);
+ TaskManagersHandler handler = new TaskManagersHandler(Time.seconds(0L), null);
String[] paths = handler.getPaths();
Assert.assertEquals(2, paths.length);
List<String> pathsList = Lists.newArrayList(paths);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
index b032061..90e032d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
@@ -18,17 +18,17 @@
package org.apache.flink.runtime.webmonitor.metrics;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.TestLogger;
-import akka.actor.ActorSystem;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
-import scala.concurrent.ExecutionContext;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.powermock.api.mockito.PowerMockito.mock;
@@ -42,7 +42,11 @@ public class AbstractMetricsHandlerTest extends TestLogger {
*/
@Test
public void testHandleRequest() throws Exception {
- MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+ MetricFetcher fetcher = new MetricFetcher(
+ mock(JobManagerRetriever.class),
+ mock(MetricQueryServiceRetriever.class),
+ Executors.directExecutor(),
+ TestingUtils.TIMEOUT());
MetricStoreTest.setupStore(fetcher.getMetricStore());
JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
@@ -91,7 +95,11 @@ public class AbstractMetricsHandlerTest extends TestLogger {
*/
@Test
public void testInvalidListDoesNotFail() {
- MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+ MetricFetcher fetcher = new MetricFetcher(
+ mock(JobManagerRetriever.class),
+ mock(MetricQueryServiceRetriever.class),
+ Executors.directExecutor(),
+ TestingUtils.TIMEOUT());
MetricStoreTest.setupStore(fetcher.getMetricStore());
JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
@@ -117,7 +125,11 @@ public class AbstractMetricsHandlerTest extends TestLogger {
*/
@Test
public void testInvalidGetDoesNotFail() {
- MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+ MetricFetcher fetcher = new MetricFetcher(
+ mock(JobManagerRetriever.class),
+ mock(MetricQueryServiceRetriever.class),
+ Executors.directExecutor(),
+ TestingUtils.TIMEOUT());
MetricStoreTest.setupStore(fetcher.getMetricStore());
JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
index 97c2055..994fc5e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
@@ -18,18 +18,18 @@
package org.apache.flink.runtime.webmonitor.metrics;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.TestLogger;
-import akka.actor.ActorSystem;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
-import scala.concurrent.ExecutionContext;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.powermock.api.mockito.PowerMockito.mock;
@@ -48,7 +48,11 @@ public class JobManagerMetricsHandlerTest extends TestLogger {
@Test
public void getMapFor() {
- MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+ MetricFetcher fetcher = new MetricFetcher(
+ mock(JobManagerRetriever.class),
+ mock(MetricQueryServiceRetriever.class),
+ Executors.directExecutor(),
+ TestingUtils.TIMEOUT());
MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
JobManagerMetricsHandler handler = new JobManagerMetricsHandler(fetcher);
@@ -62,7 +66,11 @@ public class JobManagerMetricsHandlerTest extends TestLogger {
@Test
public void getMapForNull() {
- MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+ MetricFetcher fetcher = new MetricFetcher(
+ mock(JobManagerRetriever.class),
+ mock(MetricQueryServiceRetriever.class),
+ Executors.directExecutor(),
+ TestingUtils.TIMEOUT());
MetricStore store = fetcher.getMetricStore();
JobManagerMetricsHandler handler = new JobManagerMetricsHandler(fetcher);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
index 53666eb..a35af22 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
@@ -18,18 +18,18 @@
package org.apache.flink.runtime.webmonitor.metrics;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.TestLogger;
-import akka.actor.ActorSystem;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
-import scala.concurrent.ExecutionContext;
-
import static org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -49,7 +49,11 @@ public class JobMetricsHandlerTest extends TestLogger {
@Test
public void getMapFor() throws Exception {
- MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+ MetricFetcher fetcher = new MetricFetcher(
+ mock(JobManagerRetriever.class),
+ mock(MetricQueryServiceRetriever.class),
+ Executors.directExecutor(),
+ TestingUtils.TIMEOUT());
MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
JobMetricsHandler handler = new JobMetricsHandler(fetcher);
@@ -64,7 +68,11 @@ public class JobMetricsHandlerTest extends TestLogger {
@Test
public void getMapForNull() {
- MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+ MetricFetcher fetcher = new MetricFetcher(
+ mock(JobManagerRetriever.class),
+ mock(MetricQueryServiceRetriever.class),
+ Executors.directExecutor(),
+ TestingUtils.TIMEOUT());
MetricStore store = fetcher.getMetricStore();
JobMetricsHandler handler = new JobMetricsHandler(fetcher);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
index 5f68c6f..e84b11d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
@@ -18,18 +18,18 @@
package org.apache.flink.runtime.webmonitor.metrics;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.TestLogger;
-import akka.actor.ActorSystem;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
-import scala.concurrent.ExecutionContext;
-
import static org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
import static org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID;
import static org.junit.Assert.assertEquals;
@@ -50,7 +50,11 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
@Test
public void getMapFor() throws Exception {
- MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+ MetricFetcher fetcher = new MetricFetcher(
+ mock(JobManagerRetriever.class),
+ mock(MetricQueryServiceRetriever.class),
+ Executors.directExecutor(),
+ TestingUtils.TIMEOUT());
MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
@@ -68,7 +72,11 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
@Test
public void getMapForNull() {
- MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+ MetricFetcher fetcher = new MetricFetcher(
+ mock(JobManagerRetriever.class),
+ mock(MetricQueryServiceRetriever.class),
+ Executors.directExecutor(),
+ TestingUtils.TIMEOUT());
MetricStore store = fetcher.getMetricStore();
JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
index 369e8aa..4c91997 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.metrics;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
@@ -26,49 +27,39 @@ import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.util.TestingHistogram;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
import org.apache.flink.util.TestLogger;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.Executor;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
-import scala.Option;
-import scala.collection.JavaConverters;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.Future$;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.apache.flink.runtime.metrics.dump.MetricQueryService.METRIC_QUERY_SERVICE_NAME;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isA;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
/**
* Tests for the MetricFetcher.
@@ -78,6 +69,8 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
public class MetricFetcherTest extends TestLogger {
@Test
public void testUpdate() throws Exception {
+ final Time timeout = Time.seconds(10L);
+
// ========= setup TaskManager =================================================================================
JobID jobID = new JobID();
InstanceID tmID = new InstanceID();
@@ -94,58 +87,40 @@ public class MetricFetcherTest extends TestLogger {
JobDetails details = mock(JobDetails.class);
when(details.getJobId()).thenReturn(jobID);
- ActorGateway jobManagerGateway = mock(ActorGateway.class);
- Object registeredTaskManagersAnswer = new JobManagerMessages.RegisteredTaskManagers(
- JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala());
+ JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
- when(jobManagerGateway.ask(isA(RequestJobDetails.class), any(FiniteDuration.class)))
- .thenReturn(Future$.MODULE$.successful((Object) new MultipleJobsDetails(new JobDetails[0], new JobDetails[0])));
- when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class), any(FiniteDuration.class)))
- .thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer));
- when(jobManagerGateway.path()).thenReturn("/jm/address");
+ when(jobManagerGateway.requestJobDetails(anyBoolean(), anyBoolean(), any(Time.class)))
+ .thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(new JobDetails[0], new JobDetails[0])));
+ when(jobManagerGateway.requestTaskManagerInstances(any(Time.class)))
+ .thenReturn(CompletableFuture.completedFuture(Collections.singleton(taskManager)));
+ when(jobManagerGateway.getAddress()).thenReturn("/jm/address");
+ when(jobManagerGateway.requestWebPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(0));
- JobManagerRetriever retriever = mock(JobManagerRetriever.class);
- when(retriever.getJobManagerGatewayAndWebPort())
- .thenReturn(Option.apply(new scala.Tuple2<ActorGateway, Integer>(jobManagerGateway, 0)));
+ AkkaJobManagerRetriever retriever = mock(AkkaJobManagerRetriever.class);
+ when(retriever.getJobManagerGatewayNow())
+ .thenReturn(Optional.of(jobManagerGateway));
// ========= setup QueryServices ================================================================================
- Object requestMetricsAnswer = createRequestDumpAnswer(tmID, jobID);
-
- final ActorRef jmQueryService = mock(ActorRef.class);
- final ActorRef tmQueryService = mock(ActorRef.class);
-
- ActorSystem actorSystem = mock(ActorSystem.class);
- when(actorSystem.actorFor(eq("/jm/" + METRIC_QUERY_SERVICE_NAME))).thenReturn(jmQueryService);
- when(actorSystem.actorFor(eq("/tm/" + METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString()))).thenReturn(tmQueryService);
-
- MetricFetcher.BasicGateway jmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class);
- when(jmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class)))
- .thenReturn(Future$.MODULE$.successful((Object) new MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));
-
- MetricFetcher.BasicGateway tmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class);
- when(tmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class)))
- .thenReturn(Future$.MODULE$.successful(requestMetricsAnswer));
-
- whenNew(MetricFetcher.BasicGateway.class)
- .withArguments(eq(new Object() {
- @Override
- public boolean equals(Object o) {
- return o == jmQueryService;
- }
- }))
- .thenReturn(jmQueryServiceGateway);
- whenNew(MetricFetcher.BasicGateway.class)
- .withArguments(eq(new Object() {
- @Override
- public boolean equals(Object o) {
- return o == tmQueryService;
- }
- }))
- .thenReturn(tmQueryServiceGateway);
+ MetricQueryServiceGateway jmQueryService = mock(MetricQueryServiceGateway.class);
+ MetricQueryServiceGateway tmQueryService = mock(MetricQueryServiceGateway.class);
+
+ MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer = createRequestDumpAnswer(tmID, jobID);
+
+ when(jmQueryService.queryMetrics(any(Time.class)))
+ .thenReturn(CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));
+ when(tmQueryService.queryMetrics(any(Time.class)))
+ .thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer));
+
+ MetricQueryServiceRetriever queryServiceRetriever = mock(MetricQueryServiceRetriever.class);
+ when(queryServiceRetriever.retrieveService(eq("/jm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME))).thenReturn(CompletableFuture.completedFuture(jmQueryService));
+ when(queryServiceRetriever.retrieveService(eq("/tm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString()))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
// ========= start MetricFetcher testing =======================================================================
- ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(new CurrentThreadExecutor());
- MetricFetcher fetcher = new MetricFetcher(actorSystem, retriever, context);
+ MetricFetcher fetcher = new MetricFetcher(
+ retriever,
+ queryServiceRetriever,
+ Executors.directExecutor(),
+ timeout);
// verify that update fetches metrics and updates the store
fetcher.update();
@@ -170,13 +145,7 @@ public class MetricFetcherTest extends TestLogger {
}
}
- private static class CurrentThreadExecutor implements Executor {
- public void execute(Runnable r) {
- r.run();
- }
- }
-
- private static MetricDumpSerialization.MetricSerializationResult createRequestDumpAnswer(InstanceID tmID, JobID jobID) throws IOException {
+ private static MetricDumpSerialization.MetricSerializationResult createRequestDumpAnswer(InstanceID tmID, JobID jobID) {
Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
index 4333f04..c20ea98 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
@@ -18,18 +18,18 @@
package org.apache.flink.runtime.webmonitor.metrics;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.TestLogger;
-import akka.actor.ActorSystem;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
-import scala.concurrent.ExecutionContext;
-
import static org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler.TASK_MANAGER_ID_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -49,7 +49,11 @@ public class TaskManagerMetricsHandlerTest extends TestLogger {
@Test
public void getMapFor() throws Exception {
- MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+ MetricFetcher fetcher = new MetricFetcher(
+ mock(JobManagerRetriever.class),
+ mock(MetricQueryServiceRetriever.class),
+ Executors.directExecutor(),
+ TestingUtils.TIMEOUT());
MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(fetcher);
@@ -64,7 +68,11 @@ public class TaskManagerMetricsHandlerTest extends TestLogger {
@Test
public void getMapForNull() {
- MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+ MetricFetcher fetcher = new MetricFetcher(
+ mock(JobManagerRetriever.class),
+ mock(MetricQueryServiceRetriever.class),
+ Executors.directExecutor(),
+ TestingUtils.TIMEOUT());
MetricStore store = fetcher.getMetricStore();
TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(fetcher);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 6ee78dd..bbc5889 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -22,13 +22,23 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
import org.apache.flink.util.Preconditions;
+import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -37,7 +47,8 @@ import scala.Option;
import scala.reflect.ClassTag$;
/**
- * Implementation of the {@link JobManagerGateway} for the {@link ActorGateway}.
+ * Implementation of the {@link JobManagerGateway} for old JobManager code based
+ * on Akka actors and the {@link ActorGateway}.
*/
public class AkkaJobManagerGateway implements JobManagerGateway {
@@ -48,7 +59,6 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
this.jobManagerGateway = Preconditions.checkNotNull(jobManagerGateway);
final Option<String> optHostname = jobManagerGateway.actor().path().address().host();
-
hostname = optHostname.isDefined() ? optHostname.get() : "localhost";
}
@@ -63,25 +73,6 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
}
@Override
- public CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout) {
- return FutureUtils
- .toJava(jobManagerGateway
- .ask(
- new JobManagerMessages.RequestClassloadingProps(jobId),
- FutureUtils.toFiniteDuration(timeout)))
- .thenApply(
- (Object response) -> {
- if (response instanceof JobManagerMessages.ClassloadingProps) {
- return Optional.of(((JobManagerMessages.ClassloadingProps) response));
- } else if (response instanceof JobManagerMessages.JobNotFound) {
- return Optional.empty();
- } else {
- throw new FlinkFutureException("Unknown response: " + response + '.');
- }
- });
- }
-
- @Override
public CompletableFuture<Integer> requestBlobServerPort(Time timeout) {
return FutureUtils.toJava(
jobManagerGateway
@@ -90,6 +81,21 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
}
@Override
+ public CompletableFuture<Integer> requestWebPort(Time timeout) {
+ CompletableFuture<JobManagerMessages.ResponseWebMonitorPort> portResponseFuture = FutureUtils.toJava(
+ jobManagerGateway
+ .ask(JobManagerMessages.getRequestWebMonitorPort(), FutureUtils.toFiniteDuration(timeout))
+ .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.ResponseWebMonitorPort.class)));
+
+ return portResponseFuture.thenApply(
+ JobManagerMessages.ResponseWebMonitorPort::port);
+ }
+
+ //--------------------------------------------------------------------------------
+ // Job control
+ //--------------------------------------------------------------------------------
+
+ @Override
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time timeout) {
return FutureUtils
.toJava(
@@ -119,4 +125,146 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
}
);
}
+
+ @Override
+ public CompletableFuture<String> cancelJobWithSavepoint(JobID jobId, String savepointPath, Time timeout) {
+ CompletableFuture<JobManagerMessages.CancellationResponse> cancellationFuture = FutureUtils.toJava(
+ jobManagerGateway
+ .ask(new JobManagerMessages.CancelJobWithSavepoint(jobId, savepointPath), FutureUtils.toFiniteDuration(timeout))
+ .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationResponse.class)));
+
+ return cancellationFuture.thenApply(
+ (JobManagerMessages.CancellationResponse response) -> {
+ if (response instanceof JobManagerMessages.CancellationSuccess) {
+ return ((JobManagerMessages.CancellationSuccess) response).savepointPath();
+ } else {
+ throw new FlinkFutureException("Cancel with savepoint failed.", ((JobManagerMessages.CancellationFailure) response).cause());
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
+ CompletableFuture<JobManagerMessages.CancellationResponse> responseFuture = FutureUtils.toJava(
+ jobManagerGateway
+ .ask(new JobManagerMessages.CancelJob(jobId), FutureUtils.toFiniteDuration(timeout))
+ .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationResponse.class)));
+
+ return responseFuture.thenApply(
+ (JobManagerMessages.CancellationResponse response) -> {
+ if (response instanceof JobManagerMessages.CancellationSuccess) {
+ return Acknowledge.get();
+ } else {
+ throw new FlinkFutureException("Cancel job failed " + jobId + '.', ((JobManagerMessages.CancellationFailure) response).cause());
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout) {
+ CompletableFuture<JobManagerMessages.StoppingResponse> responseFuture = FutureUtils.toJava(
+ jobManagerGateway
+ .ask(new JobManagerMessages.StopJob(jobId), FutureUtils.toFiniteDuration(timeout))
+ .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.StoppingResponse.class)));
+
+ return responseFuture.thenApply(
+ (JobManagerMessages.StoppingResponse response) -> {
+ if (response instanceof JobManagerMessages.StoppingSuccess) {
+ return Acknowledge.get();
+ } else {
+ throw new FlinkFutureException("Stop job failed " + jobId + '.', ((JobManagerMessages.StoppingFailure) response).cause());
+ }
+ });
+ }
+
+ //--------------------------------------------------------------------------------
+ // JobManager information
+ //--------------------------------------------------------------------------------
+
+ @Override
+ public CompletableFuture<Optional<Instance>> requestTaskManagerInstance(InstanceID instanceId, Time timeout) {
+ return FutureUtils.toJava(
+ jobManagerGateway
+ .ask(new JobManagerMessages.RequestTaskManagerInstance(instanceId), FutureUtils.toFiniteDuration(timeout))
+ .mapTo(ClassTag$.MODULE$.<JobManagerMessages.TaskManagerInstance>apply(JobManagerMessages.TaskManagerInstance.class)))
+ .thenApply(
+ (JobManagerMessages.TaskManagerInstance taskManagerResponse) -> {
+ if (taskManagerResponse.instance().isDefined()) {
+ return Optional.of(taskManagerResponse.instance().get());
+ } else {
+ return Optional.empty();
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<Collection<Instance>> requestTaskManagerInstances(Time timeout) {
+ CompletableFuture<JobManagerMessages.RegisteredTaskManagers> taskManagersFuture = FutureUtils.toJava(
+ jobManagerGateway
+ .ask(JobManagerMessages.getRequestRegisteredTaskManagers(), FutureUtils.toFiniteDuration(timeout))
+ .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.RegisteredTaskManagers.class)));
+
+ return taskManagersFuture.thenApply(
+ JobManagerMessages.RegisteredTaskManagers::asJavaCollection);
+ }
+
+ @Override
+ public CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout) {
+ return FutureUtils
+ .toJava(jobManagerGateway
+ .ask(
+ new JobManagerMessages.RequestClassloadingProps(jobId),
+ FutureUtils.toFiniteDuration(timeout)))
+ .thenApply(
+ (Object response) -> {
+ if (response instanceof JobManagerMessages.ClassloadingProps) {
+ return Optional.of(((JobManagerMessages.ClassloadingProps) response));
+ } else if (response instanceof JobManagerMessages.JobNotFound) {
+ return Optional.empty();
+ } else {
+ throw new FlinkFutureException("Unknown response: " + response + '.');
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<MultipleJobsDetails> requestJobDetails(boolean includeRunning, boolean includeFinished, Time timeout) {
+ return FutureUtils.toJava(
+ jobManagerGateway
+ .ask(new RequestJobDetails(true, true), FutureUtils.toFiniteDuration(timeout))
+ .mapTo(ClassTag$.MODULE$.apply(MultipleJobsDetails.class)));
+ }
+
+ @Override
+ public CompletableFuture<Optional<AccessExecutionGraph>> requestJob(JobID jobId, Time timeout) {
+ CompletableFuture<JobManagerMessages.JobResponse> jobResponseFuture = FutureUtils.toJava(
+ jobManagerGateway
+ .ask(new JobManagerMessages.RequestJob(jobId), FutureUtils.toFiniteDuration(timeout))
+ .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.JobResponse.class)));
+
+ return jobResponseFuture.thenApply(
+ (JobManagerMessages.JobResponse jobResponse) -> {
+ if (jobResponse instanceof JobManagerMessages.JobFound) {
+ return Optional.of(((JobManagerMessages.JobFound) jobResponse).executionGraph());
+ } else {
+ return Optional.empty();
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<StatusOverview> requestStatusOverview(Time timeout) {
+ return FutureUtils.toJava(
+ jobManagerGateway
+ .ask(RequestStatusOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
+ .mapTo(ClassTag$.MODULE$.apply(StatusOverview.class)));
+ }
+
+ @Override
+ public CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time timeout) {
+ return FutureUtils.toJava(
+ jobManagerGateway
+ .ask(RequestJobsWithIDsOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
+ .mapTo(ClassTag$.MODULE$.apply(JobsWithIDsOverview.class)));
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 562e697..19f0e2c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -421,7 +421,9 @@ public class JobClient {
LOG.info("Checking and uploading JAR files");
- final CompletableFuture<InetSocketAddress> blobServerAddressFuture = retrieveBlobServerAddress(jobManagerGateway, timeout);
+ final CompletableFuture<InetSocketAddress> blobServerAddressFuture = retrieveBlobServerAddress(
+ jobManagerGateway,
+ timeout);
final InetSocketAddress blobServerAddress;
@@ -448,7 +450,7 @@ public class JobClient {
"JobManager did not respond within " + timeout, e);
} catch (Throwable throwable) {
Throwable stripped = ExceptionUtils.stripExecutionException(throwable);
-
+
try {
ExceptionUtils.tryDeserializeAndThrow(stripped, classLoader);
} catch (JobExecutionException jee) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index f204393..d24a3d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -18,14 +18,14 @@
package org.apache.flink.runtime.clusterframework;
-import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import akka.actor.Address;
import com.typesafe.config.Config;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.lang3.StringUtils;
+
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
@@ -35,6 +35,8 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
@@ -52,6 +54,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.Executor;
/**
* Tools for starting JobManager and TaskManager processes, including the
@@ -171,7 +174,11 @@ public class BootstrapTools {
*
* @param config The Flink config.
* @param highAvailabilityServices Service factory for high availability services
- * @param actorSystem The ActorSystem to start the web frontend in.
+ * @param jobManagerRetriever to retrieve the leading JobManagerGateway
+ * @param queryServiceRetriever to resolve a query service
+ * @param timeout for asynchronous operations
+ * @param executor to run asynchronous operations
+ * @param jobManagerAddress the address of the JobManager for which the WebMonitor is started
* @param logger Logger for log output
* @return WebMonitor instance.
* @throws Exception
@@ -179,16 +186,13 @@ public class BootstrapTools {
public static WebMonitor startWebMonitorIfConfigured(
Configuration config,
HighAvailabilityServices highAvailabilityServices,
- ActorSystem actorSystem,
- ActorRef jobManager,
+ JobManagerRetriever jobManagerRetriever,
+ MetricQueryServiceRetriever queryServiceRetriever,
+ Time timeout,
+ Executor executor,
+ String jobManagerAddress,
Logger logger) throws Exception {
-
- // this ensures correct values are present in the web frontend
- final Address address = AkkaUtils.getAddress(actorSystem);
- config.setString(JobManagerOptions.ADDRESS, address.host().get());
- config.setInteger(JobManagerOptions.PORT, Integer.parseInt(address.port().get().toString()));
-
if (config.getInteger(WebOptions.PORT, 0) >= 0) {
logger.info("Starting JobManager Web Frontend");
@@ -197,12 +201,14 @@ public class BootstrapTools {
WebMonitor monitor = WebMonitorUtils.startWebRuntimeMonitor(
config,
highAvailabilityServices,
- actorSystem);
+ jobManagerRetriever,
+ queryServiceRetriever,
+ timeout,
+ executor);
// start the web monitor
if (monitor != null) {
- String jobManagerAkkaURL = AkkaUtils.getAkkaURL(actorSystem, jobManager);
- monitor.start(jobManagerAkkaURL);
+ monitor.start(jobManagerAddress);
}
return monitor;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 043c603..5c6439d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -305,6 +305,16 @@ public class FutureUtils {
return new FiniteDuration(time.toMilliseconds(), TimeUnit.MILLISECONDS);
}
+ /**
+ * Converts {@link FiniteDuration} into Flink time.
+ *
+ * @param finiteDuration to convert into Flink time
+ * @return Flink time with the length of the given finite duration
+ */
+ public static Time toTime(FiniteDuration finiteDuration) {
+ return Time.milliseconds(finiteDuration.toMillis());
+ }
+
// ------------------------------------------------------------------------
// Converting futures
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
index cba7b06..a4d0d11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
@@ -21,11 +21,20 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
import org.apache.flink.runtime.rpc.RpcGateway;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -38,21 +47,19 @@ import java.util.concurrent.CompletableFuture;
public interface JobManagerGateway extends RpcGateway {
/**
- * Requests the class loading properties for the given JobID.
+ * Requests the BlobServer port.
*
- * @param jobId for which the class loading properties are requested
* @param timeout for this operation
- * @return Future containing the optional class loading properties if they could be retrieved from the JobManager.
+ * @return Future containing the BlobServer port
*/
- CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout);
+ CompletableFuture<Integer> requestBlobServerPort(Time timeout);
/**
- * Requests the BlobServer port.
+ * Returns the port of the web runtime monitor serving requests for the JobManager endpoint.
*
- * @param timeout for this operation
- * @return Future containing the BlobServer port
+ * @return Port of the WebRuntimeMonitor responsible for the JobManager endpoint
*/
- CompletableFuture<Integer> requestBlobServerPort(Time timeout);
+ CompletableFuture<Integer> requestWebPort(Time timeout);
/**
* Submits a job to the JobManager.
@@ -63,4 +70,103 @@ public interface JobManagerGateway extends RpcGateway {
* @return Future containing an Acknowledge message if the submission succeeded
*/
CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time timeout);
+
+ /**
+ * Cancels the given job after taking a savepoint and returning its path.
+ *
+ * If the savepointPath is null, then the JobManager will use the default savepoint directory
+ * to store the savepoint in. After the savepoint has been taken and the job has been canceled
+ * successfully, the path of the savepoint is returned.
+ *
+ * @param jobId identifying the job to cancel
+ * @param savepointPath Optional path for the savepoint to be stored under; if null, then the default path is
+ * taken
+ * @param timeout for the asynchronous operation
+ * @return Future containing the savepoint path of the taken savepoint or an Exception if the operation failed
+ */
+ CompletableFuture<String> cancelJobWithSavepoint(JobID jobId, @Nullable String savepointPath, Time timeout);
+
+ /**
+ * Cancels the given job.
+ *
+ * @param jobId identifying the job to cancel
+ * @param timeout for the asynchronous operation
+ * @return Future containing Acknowledge or an Exception if the operation failed
+ */
+ CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout);
+
+ /**
+ * Stops the given job.
+ *
+ * @param jobId identifying the job to cancel
+ * @param timeout for the asynchronous operation
+ * @return Future containing Acknowledge or an Exception if the operation failed
+ */
+ CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout);
+
+ /**
+ * Requests the class loading properties for the given JobID.
+ *
+ * @param jobId for which the class loading properties are requested
+ * @param timeout for this operation
+ * @return Future containing the optional class loading properties if they could be retrieved from the JobManager.
+ */
+ CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout);
+
+ /**
+ * Requests the TaskManager instance registered under the given instanceId from the JobManager.
+ * If there is no Instance registered, then {@link Optional#empty()} is returned.
+ *
+ * @param instanceId for which to retrieve the Instance
+ * @param timeout for the asynchronous operation
+ * @return Future containing the TaskManager instance registered under instanceId, otherwise {@link Optional#empty()}
+ */
+ CompletableFuture<Optional<Instance>> requestTaskManagerInstance(InstanceID instanceId, Time timeout);
+
+ /**
+ * Requests all currently registered TaskManager instances from the JobManager.
+ *
+ * @param timeout for the asynchronous operation
+ * @return Future containing the collection of all currently registered TaskManager instances
+ */
+ CompletableFuture<Collection<Instance>> requestTaskManagerInstances(Time timeout);
+
+ /**
+ * Requests job details currently being executed by the JobManager.
+ *
+ * @param includeRunning true if running jobs shall be included, otherwise false
+ * @param includeFinished true if finished jobs shall be included, otherwise false
+ * @param timeout for the asynchronous operation
+ * @return Future containing the job details
+ */
+ CompletableFuture<MultipleJobsDetails> requestJobDetails(
+ boolean includeRunning,
+ boolean includeFinished,
+ Time timeout);
+
+ /**
+ * Requests the AccessExecutionGraph for the given jobId. If there is no such graph, then
+ * {@link Optional#empty()} is returned.
+ *
+ * @param jobId identifying the job whose AccessExecutionGraph is requested
+ * @param timeout for the asynchronous operation
+ * @return Future containing the AccessExecutionGraph for the given jobId, otherwise {@link Optional#empty()}
+ */
+ CompletableFuture<Optional<AccessExecutionGraph>> requestJob(JobID jobId, Time timeout);
+
+ /**
+ * Requests the status overview from the JobManager.
+ *
+ * @param timeout for the asynchronous operation
+ * @return Future containing the status overview
+ */
+ CompletableFuture<StatusOverview> requestStatusOverview(Time timeout);
+
+ /**
+ * Requests the job overview from the JobManager.
+ *
+ * @param timeout for the asynchronous operation
+ * @return Future containing the job overview
+ */
+ CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
index e173522..1791fe1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -25,6 +25,7 @@ import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.util.DataInputDeserializer;
import org.apache.flink.runtime.util.DataOutputSerializer;
import org.apache.flink.util.Preconditions;
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 9ebb126..9493696 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.Path;
@@ -31,8 +32,9 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import akka.actor.ActorSystem;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -47,6 +49,7 @@ import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.Executor;
/**
* Utilities for the web runtime monitor. This class contains for example methods to build
@@ -119,27 +122,39 @@ public final class WebMonitorUtils {
*
* @param config The configuration for the runtime monitor.
* @param highAvailabilityServices HighAvailabilityServices used to start the WebRuntimeMonitor
- * @param actorSystem ActorSystem used to connect to the JobManager
- *
+ * @param jobManagerRetriever which retrieves the currently leading JobManager
+ * @param queryServiceRetriever which retrieves the query service
+ * @param timeout for asynchronous operations
+ * @param executor to run asynchronous operations
*/
public static WebMonitor startWebRuntimeMonitor(
Configuration config,
HighAvailabilityServices highAvailabilityServices,
- ActorSystem actorSystem) {
+ JobManagerRetriever jobManagerRetriever,
+ MetricQueryServiceRetriever queryServiceRetriever,
+ Time timeout,
+ Executor executor) {
// try to load and instantiate the class
try {
String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
Class<? extends WebMonitor> clazz = Class.forName(classname).asSubclass(WebMonitor.class);
- Constructor<? extends WebMonitor> constructor = clazz.getConstructor(Configuration.class,
+ Constructor<? extends WebMonitor> constructor = clazz.getConstructor(
+ Configuration.class,
LeaderRetrievalService.class,
BlobView.class,
- ActorSystem.class);
+ JobManagerRetriever.class,
+ MetricQueryServiceRetriever.class,
+ Time.class,
+ Executor.class);
return constructor.newInstance(
config,
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
highAvailabilityServices.createBlobStore(),
- actorSystem);
+ jobManagerRetriever,
+ queryServiceRetriever,
+ timeout,
+ executor);
} catch (ClassNotFoundException e) {
LOG.error("Could not load web runtime monitor. " +
"Probably reason: flink-runtime-web is not in the classpath");
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java
new file mode 100644
index 0000000..2eade48
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Retrieves and stores the JobManagerGateway for the current leading JobManager.
+ */
+public abstract class JobManagerRetriever implements LeaderRetrievalListener {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ // False if we have to create a new JobManagerGateway future when being notified
+ // about a new leader address
+ private final AtomicBoolean firstTimeUsage;
+
+ private volatile CompletableFuture<JobManagerGateway> jobManagerGatewayFuture;
+
+ public JobManagerRetriever() {
+ firstTimeUsage = new AtomicBoolean(true);
+ jobManagerGatewayFuture = new CompletableFuture<>();
+ }
+
+ /**
+ * Returns the currently known leading job manager gateway and its web monitor port.
+ */
+ public Optional<JobManagerGateway> getJobManagerGatewayNow() throws Exception {
+ if (jobManagerGatewayFuture != null) {
+ CompletableFuture<JobManagerGateway> jobManagerGatewayFuture = this.jobManagerGatewayFuture;
+
+ if (jobManagerGatewayFuture.isDone()) {
+ return Optional.of(jobManagerGatewayFuture.get());
+ } else {
+ return Optional.empty();
+ }
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Returns the current JobManagerGateway future.
+ */
+ public CompletableFuture<JobManagerGateway> getJobManagerGateway() throws Exception {
+ return jobManagerGatewayFuture;
+ }
+
+ @Override
+ public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
+ if (leaderAddress != null && !leaderAddress.equals("")) {
+ try {
+ final CompletableFuture<JobManagerGateway> newJobManagerGatewayFuture;
+
+ if (firstTimeUsage.compareAndSet(true, false)) {
+ newJobManagerGatewayFuture = jobManagerGatewayFuture;
+ } else {
+ newJobManagerGatewayFuture = new CompletableFuture<>();
+ jobManagerGatewayFuture = newJobManagerGatewayFuture;
+ }
+
+ log.info("New leader reachable under {}:{}.", leaderAddress, leaderSessionID);
+
+ createJobManagerGateway(leaderAddress, leaderSessionID).whenComplete(
+ (JobManagerGateway jobManagerGateway, Throwable throwable) -> {
+ if (throwable != null) {
+ newJobManagerGatewayFuture.completeExceptionally(new FlinkException("Could not retrieve" +
+ "the current job manager gateway.", throwable));
+ } else {
+ newJobManagerGatewayFuture.complete(jobManagerGateway);
+ }
+ }
+ );
+ }
+ catch (Exception e) {
+ handleError(e);
+ }
+ }
+ }
+
+ @Override
+ public void handleError(Exception exception) {
+ log.error("Received error from LeaderRetrievalService.", exception);
+
+ jobManagerGatewayFuture.completeExceptionally(exception);
+ }
+
+ /**
+ * Create a JobManagerGateway for the given leader address and leader id.
+ *
+ * @param leaderAddress to connect against
+ * @param leaderId the endpoint currently uses
+ * @return Future containing the resolved JobManagerGateway
+ * @throws Exception if the JobManagerGateway creation failed
+ */
+ protected abstract CompletableFuture<JobManagerGateway> createJobManagerGateway(String leaderAddress, UUID leaderId) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
new file mode 100644
index 0000000..c79bf5d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceGateway;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Gateway to communicate with a QueryService.
+ *
+ * <p>Currently there is only one implementation working with a Akka based
+ * MetricQueryService {@link AkkaQueryServiceGateway}.
+ */
+public interface MetricQueryServiceGateway {
+
+ CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceRetriever.java
new file mode 100644
index 0000000..7bb9b44
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceRetriever.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Retriever for {@link MetricQueryServiceGateway}.
+ */
+public interface MetricQueryServiceRetriever {
+
+ /**
+ * Retrieves for the given query service path a {@link MetricQueryServiceGateway}.
+ *
+ * @param queryServicePath under which the QueryService can be reached
+ * @return Future containing the resolved QueryServiceGateway
+ */
+ CompletableFuture<MetricQueryServiceGateway> retrieveService(String queryServicePath);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java
new file mode 100644
index 0000000..027b42a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link JobManagerRetriever} implementation for Akka based JobManagers.
+ */
+public class AkkaJobManagerRetriever extends JobManagerRetriever {
+
+ private final ActorSystem actorSystem;
+ private final Time timeout;
+
+ public AkkaJobManagerRetriever(
+ ActorSystem actorSystem,
+ Time timeout) {
+
+ this.actorSystem = Preconditions.checkNotNull(actorSystem);
+ this.timeout = Preconditions.checkNotNull(timeout);
+ }
+
+ @Override
+ protected CompletableFuture<JobManagerGateway> createJobManagerGateway(String leaderAddress, UUID leaderId) throws Exception {
+ return FutureUtils.toJava(
+ AkkaUtils.getActorRefFuture(
+ leaderAddress,
+ actorSystem,
+ FutureUtils.toFiniteDuration(timeout)))
+ .thenApplyAsync(
+ (ActorRef jobManagerRef) -> {
+ ActorGateway leaderGateway = new AkkaActorGateway(
+ jobManagerRef, leaderId);
+
+ return new AkkaJobManagerGateway(leaderGateway);
+ },
+ actorSystem.dispatcher());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
new file mode 100644
index 0000000..8985205
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+
+import java.util.concurrent.CompletableFuture;
+
+import scala.reflect.ClassTag$;
+
+/**
+ * {@link MetricQueryServiceGateway} implementation for Akka based {@link MetricQueryService}.
+ */
+public class AkkaQueryServiceGateway implements MetricQueryServiceGateway {
+
+ private final ActorRef queryServiceActorRef;
+
+ public AkkaQueryServiceGateway(ActorRef queryServiceActorRef) {
+ this.queryServiceActorRef = Preconditions.checkNotNull(queryServiceActorRef);
+ }
+
+ @Override
+ public CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout) {
+ return FutureUtils.toJava(
+ Patterns.ask(queryServiceActorRef, MetricQueryService.getCreateDump(), timeout.toMilliseconds())
+ .mapTo(ClassTag$.MODULE$.apply(MetricDumpSerialization.MetricSerializationResult.class))
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceRetriever.java
new file mode 100644
index 0000000..7de436a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceRetriever.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link MetricQueryServiceRetriever} implementation for Akka based {@link MetricQueryService}.
+ */
+public class AkkaQueryServiceRetriever implements MetricQueryServiceRetriever {
+ private final ActorSystem actorSystem;
+ private final Time lookupTimeout;
+
+ public AkkaQueryServiceRetriever(ActorSystem actorSystem, Time lookupTimeout) {
+ this.actorSystem = Preconditions.checkNotNull(actorSystem);
+ this.lookupTimeout = Preconditions.checkNotNull(lookupTimeout);
+ }
+
+ @Override
+ public CompletableFuture<MetricQueryServiceGateway> retrieveService(String queryServicePath) {
+ ActorSelection selection = actorSystem.actorSelection(queryServicePath);
+
+ return FutureUtils.toJava(selection.resolveOne(FutureUtils.toFiniteDuration(lookupTimeout))).thenApply(AkkaQueryServiceGateway::new);
+ }
+}
[4/4] flink git commit: [FLINK-7381] [web] Decouple WebRuntimeMonitor
from ActorGateway
Posted by tr...@apache.org.
[FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorGateway
This PR decouples the WebRuntimeMonitor from the ActorGateway by introducing
the JobManagerGateway interface which can have multiple implementations. This
is a preliminary step for the integration of the existing WebRuntimeMonitor
with the Flip-6 JobMaster.
Add time unit for web.timeout
This closes #4492.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f790d3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f790d3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f790d3e
Branch: refs/heads/master
Commit: 9f790d3efe5e8267da05eb97bbe07ca8a0f859fe
Parents: 00d5b62
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Aug 2 18:43:00 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Aug 11 13:48:14 2017 +0200
----------------------------------------------------------------------
docs/ops/config.md | 2 +
.../apache/flink/configuration/WebOptions.java | 7 +
.../MesosApplicationMasterRunner.java | 13 +-
.../webmonitor/ExecutionGraphHolder.java | 59 ++----
.../runtime/webmonitor/JobManagerRetriever.java | 197 ------------------
.../webmonitor/RuntimeMonitorHandler.java | 16 +-
.../webmonitor/RuntimeMonitorHandlerBase.java | 36 ++--
.../runtime/webmonitor/WebRuntimeMonitor.java | 74 ++++---
.../files/StaticFileServerHandler.java | 35 ++--
.../AbstractExecutionGraphRequestHandler.java | 22 +-
.../handlers/AbstractJsonRequestHandler.java | 10 +-
.../handlers/ClusterOverviewHandler.java | 23 +--
.../handlers/CurrentJobIdsHandler.java | 22 +-
.../handlers/CurrentJobsOverviewHandler.java | 24 +--
.../handlers/DashboardConfigHandler.java | 4 +-
.../handlers/HandlerRedirectUtils.java | 41 ++--
.../handlers/JarAccessDeniedHandler.java | 4 +-
.../webmonitor/handlers/JarDeleteHandler.java | 4 +-
.../webmonitor/handlers/JarListHandler.java | 4 +-
.../webmonitor/handlers/JarPlanHandler.java | 4 +-
.../webmonitor/handlers/JarRunHandler.java | 20 +-
.../webmonitor/handlers/JarUploadHandler.java | 8 +-
.../handlers/JobCancellationHandler.java | 19 +-
.../JobCancellationWithSavepointHandlers.java | 124 ++++++------
.../handlers/JobManagerConfigHandler.java | 4 +-
.../webmonitor/handlers/JobStoppingHandler.java | 19 +-
.../webmonitor/handlers/RequestHandler.java | 9 +-
.../handlers/TaskManagerLogHandler.java | 66 +++---
.../handlers/TaskManagersHandler.java | 39 ++--
.../CheckpointStatsDetailsSubtasksHandler.java | 6 +-
.../metrics/AbstractMetricsHandler.java | 4 +-
.../webmonitor/metrics/MetricFetcher.java | 202 +++++++++----------
.../runtime/webmonitor/WebFrontendITCase.java | 27 +--
.../webmonitor/WebRuntimeMonitorITCase.java | 73 ++++---
.../handlers/ClusterOverviewHandlerTest.java | 8 +-
.../handlers/CurrentJobIdsHandlerTest.java | 8 +-
.../CurrentJobsOverviewHandlerTest.java | 10 +-
.../handlers/HandlerRedirectUtilsTest.java | 39 ++--
.../webmonitor/handlers/JarRunHandlerTest.java | 5 +-
.../handlers/JobAccumulatorsHandlerTest.java | 5 +-
.../handlers/JobCancellationHandlerTest.java | 4 +-
...obCancellationWithSavepointHandlersTest.java | 82 ++++----
.../handlers/JobConfigHandlerTest.java | 5 +-
.../handlers/JobDetailsHandlerTest.java | 5 +-
.../handlers/JobExceptionsHandlerTest.java | 5 +-
.../webmonitor/handlers/JobPlanHandlerTest.java | 5 +-
.../handlers/JobStoppingHandlerTest.java | 7 +-
.../JobVertexAccumulatorsHandlerTest.java | 5 +-
.../JobVertexBackPressureHandlerTest.java | 2 +-
.../handlers/JobVertexDetailsHandlerTest.java | 5 +-
.../JobVertexTaskManagersHandlerTest.java | 5 +-
...SubtaskCurrentAttemptDetailsHandlerTest.java | 6 +-
...ExecutionAttemptAccumulatorsHandlerTest.java | 5 +-
...btaskExecutionAttemptDetailsHandlerTest.java | 5 +-
.../SubtasksAllAccumulatorsHandlerTest.java | 5 +-
.../handlers/SubtasksTimesHandlerTest.java | 5 +-
.../handlers/TaskManagerLogHandlerTest.java | 54 ++---
.../handlers/TaskManagersHandlerTest.java | 7 +-
.../metrics/AbstractMetricsHandlerTest.java | 26 ++-
.../metrics/JobManagerMetricsHandlerTest.java | 20 +-
.../metrics/JobMetricsHandlerTest.java | 20 +-
.../metrics/JobVertexMetricsHandlerTest.java | 20 +-
.../webmonitor/metrics/MetricFetcherTest.java | 111 ++++------
.../metrics/TaskManagerMetricsHandlerTest.java | 20 +-
.../runtime/akka/AkkaJobManagerGateway.java | 190 +++++++++++++++--
.../apache/flink/runtime/client/JobClient.java | 6 +-
.../clusterframework/BootstrapTools.java | 34 ++--
.../flink/runtime/concurrent/FutureUtils.java | 10 +
.../runtime/jobmaster/JobManagerGateway.java | 122 ++++++++++-
.../metrics/dump/MetricDumpSerialization.java | 1 +
.../runtime/webmonitor/WebMonitorUtils.java | 29 ++-
.../retriever/JobManagerRetriever.java | 123 +++++++++++
.../retriever/MetricQueryServiceGateway.java | 36 ++++
.../retriever/MetricQueryServiceRetriever.java | 35 ++++
.../retriever/impl/AkkaJobManagerRetriever.java | 69 +++++++
.../retriever/impl/AkkaQueryServiceGateway.java | 53 +++++
.../impl/AkkaQueryServiceRetriever.java | 51 +++++
.../retriever/impl/RpcJobManagerRetriever.java | 46 +++++
.../flink/runtime/jobmanager/JobManager.scala | 10 +-
.../runtime/minicluster/FlinkMiniCluster.scala | 10 +-
.../runtime/testingUtils/TestingUtils.scala | 2 +
.../flink/yarn/YarnApplicationMasterRunner.java | 12 +-
82 files changed, 1551 insertions(+), 1018 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index c8d5c92..4138b4d 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -389,6 +389,8 @@ These parameters allow for advanced tuning. The default values are sufficient wh
- `web.access-control-allow-origin`: Enable custom access control parameter for allow origin header, default is `*`.
+- `web.timeout`: Timeout for asynchronous operation executed by the web frontend in milliseconds (DEFAULT: `10000`, 10 s)
+
### File Systems
The parameters define the behavior of tasks that create result files.
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
index f499045..3733244 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
@@ -149,6 +149,13 @@ public class WebOptions {
.defaultValue(50)
.withDeprecatedKeys("jobmanager.web.backpressure.delay-between-samples");
+ /**
+ * Timeout for asynchronous operations by the WebRuntimeMonitor in milliseconds.
+ */
+ public static final ConfigOption<Long> TIMEOUT = ConfigOptions
+ .key("web.timeout")
+ .defaultValue(10L * 1000L);
+
private WebOptions() {
throw new IllegalAccessError();
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 260b7f3..7891386 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -18,10 +18,12 @@
package org.apache.flink.mesos.runtime.clusterframework;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
@@ -52,6 +54,8 @@ import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
@@ -319,11 +323,16 @@ public class MesosApplicationMasterRunner {
// 2: the web monitor
LOG.debug("Starting Web Frontend");
+ Time webMonitorTimeout = Time.milliseconds(config.getLong(WebOptions.TIMEOUT));
+
webMonitor = BootstrapTools.startWebMonitorIfConfigured(
config,
highAvailabilityServices,
- actorSystem,
- jobManager,
+ new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout),
+ new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout),
+ webMonitorTimeout,
+ futureExecutor,
+ AkkaUtils.getAkkaURL(actorSystem, jobManager),
LOG);
if (webMonitor != null) {
final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(), "/");
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 75b0475..739b375 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -19,20 +19,19 @@
package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Optional;
import java.util.WeakHashMap;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -48,7 +47,7 @@ public class ExecutionGraphHolder {
private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class);
- private final FiniteDuration timeout;
+ private final Time timeout;
private final WeakHashMap<JobID, AccessExecutionGraph> cache = new WeakHashMap<>();
@@ -56,50 +55,36 @@ public class ExecutionGraphHolder {
this(WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
}
- public ExecutionGraphHolder(FiniteDuration timeout) {
+ public ExecutionGraphHolder(Time timeout) {
this.timeout = checkNotNull(timeout);
}
/**
- * Retrieves the execution graph with {@link JobID} jid or null if it cannot be found.
+ * Retrieves the execution graph with {@link JobID} jid wrapped in {@link Optional} or
+ * {@link Optional#empty()} if it cannot be found.
*
* @param jid jobID of the execution graph to be retrieved
- * @return the retrieved execution graph or null if it is not retrievable
+ * @return Optional ExecutionGraph if it has been retrievable, empty if there has been no ExecutionGraph
+ * @throws Exception if the ExecutionGraph retrieval failed.
*/
- public AccessExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) {
+ public Optional<AccessExecutionGraph> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) throws Exception {
AccessExecutionGraph cached = cache.get(jid);
if (cached != null) {
if (cached.getState() == JobStatus.SUSPENDED) {
cache.remove(jid);
} else {
- return cached;
+ return Optional.of(cached);
}
}
- try {
- if (jobManager != null) {
- Future<Object> future = jobManager.ask(new JobManagerMessages.RequestJob(jid), timeout);
- Object result = Await.result(future, timeout);
-
- if (result instanceof JobManagerMessages.JobNotFound) {
- return null;
- }
- else if (result instanceof JobManagerMessages.JobFound) {
- AccessExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
- cache.put(jid, eg);
- return eg;
- }
- else {
- throw new RuntimeException("Unknown response from JobManager / Archive: " + result);
- }
- }
- else {
- LOG.warn("No connection to the leading JobManager.");
- return null;
- }
- }
- catch (Exception e) {
- throw new RuntimeException("Error requesting execution graph", e);
- }
+ CompletableFuture<Optional<AccessExecutionGraph>> executionGraphFuture = jobManagerGateway.requestJob(jid, timeout);
+
+ Optional<AccessExecutionGraph> result = executionGraphFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ return result.map((executionGraph) -> {
+ cache.put(jid, executionGraph);
+
+ return executionGraph;
+ });
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
deleted file mode 100644
index 175a4b8..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.ResponseWebMonitorPort;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-import akka.dispatch.OnComplete;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.UUID;
-import java.util.concurrent.TimeoutException;
-
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Retrieves and stores the actor gateway to the current leading JobManager. In case of an error,
- * the {@link WebRuntimeMonitor} to which this instance is associated will be stopped.
- *
- * <p>The job manager gateway only works if the web monitor and the job manager run in the same
- * actor system, because many execution graph structures are not serializable. This breaks the nice
- * leader retrieval abstraction and we have a special code path in case that another job manager is
- * leader (see {@link org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils}. In such a
- * case, we get the address of the web monitor of the leading job manager and redirect to it
- * (instead of directly communicating with it).
- */
-public class JobManagerRetriever implements LeaderRetrievalListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(JobManagerRetriever.class);
-
- private final Object waitLock = new Object();
-
- private final WebMonitor webMonitor;
- private final ActorSystem actorSystem;
- private final FiniteDuration lookupTimeout;
- private final FiniteDuration timeout;
-
- private volatile Future<Tuple2<ActorGateway, Integer>> leaderGatewayPortFuture;
-
- public JobManagerRetriever(
- WebMonitor webMonitor,
- ActorSystem actorSystem,
- FiniteDuration lookupTimeout,
- FiniteDuration timeout) {
-
- this.webMonitor = checkNotNull(webMonitor);
- this.actorSystem = checkNotNull(actorSystem);
- this.lookupTimeout = checkNotNull(lookupTimeout);
- this.timeout = checkNotNull(timeout);
- }
-
- /**
- * Returns the currently known leading job manager gateway and its web monitor port.
- */
- public Option<Tuple2<ActorGateway, Integer>> getJobManagerGatewayAndWebPort() throws Exception {
- if (leaderGatewayPortFuture != null) {
- Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture = leaderGatewayPortFuture;
-
- if (gatewayPortFuture.isCompleted()) {
- Tuple2<ActorGateway, Integer> gatewayPort = Await.result(gatewayPortFuture, timeout);
-
- return Option.apply(gatewayPort);
- } else {
- return Option.empty();
- }
- } else {
- return Option.empty();
- }
- }
-
- /**
- * Awaits the leading job manager gateway and its web monitor port.
- */
- public Tuple2<ActorGateway, Integer> awaitJobManagerGatewayAndWebPort() throws Exception {
- Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture = null;
- Deadline deadline = timeout.fromNow();
-
- while (!deadline.isOverdue()) {
- synchronized (waitLock) {
- gatewayPortFuture = leaderGatewayPortFuture;
-
- if (gatewayPortFuture != null) {
- break;
- }
-
- waitLock.wait(deadline.timeLeft().toMillis());
- }
- }
-
- if (gatewayPortFuture == null) {
- throw new TimeoutException("There is no JobManager available.");
- } else {
- return Await.result(gatewayPortFuture, deadline.timeLeft());
- }
- }
-
- @Override
- public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
- if (leaderAddress != null && !leaderAddress.equals("")) {
- try {
- final Promise<Tuple2<ActorGateway, Integer>> leaderGatewayPortPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
- synchronized (waitLock) {
- leaderGatewayPortFuture = leaderGatewayPortPromise.future();
- waitLock.notifyAll();
- }
-
- LOG.info("New leader reachable under {}:{}.", leaderAddress, leaderSessionID);
-
- AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, lookupTimeout)
- // Resolve the actor ref
- .flatMap(new Mapper<ActorRef, Future<Tuple2<ActorGateway, Object>>>() {
- @Override
- public Future<Tuple2<ActorGateway, Object>> apply(ActorRef jobManagerRef) {
- ActorGateway leaderGateway = new AkkaActorGateway(
- jobManagerRef, leaderSessionID);
-
- Future<Object> webMonitorPort = leaderGateway.ask(
- JobManagerMessages.getRequestWebMonitorPort(),
- timeout);
-
- return Futures.successful(leaderGateway).zip(webMonitorPort);
- }
- }, actorSystem.dispatcher())
- // Request the web monitor port
- .onComplete(new OnComplete<Tuple2<ActorGateway, Object>>() {
- @Override
- public void onComplete(Throwable failure, Tuple2<ActorGateway, Object> success) throws Throwable {
- if (failure == null) {
- if (success._2() instanceof ResponseWebMonitorPort) {
- int webMonitorPort = ((ResponseWebMonitorPort) success._2()).port();
-
- leaderGatewayPortPromise.success(new Tuple2<>(success._1(), webMonitorPort));
- } else {
- leaderGatewayPortPromise.failure(new Exception("Received the message " +
- success._2() + " as response to " + JobManagerMessages.getRequestWebMonitorPort() +
- ". But a message of type " + ResponseWebMonitorPort.class + " was expected."));
- }
- } else {
- LOG.warn("Failed to retrieve leader gateway and port.", failure);
- leaderGatewayPortPromise.failure(failure);
- }
- }
- }, actorSystem.dispatcher());
- }
- catch (Exception e) {
- handleError(e);
- }
- }
- }
-
- @Override
- public void handleError(Exception exception) {
- LOG.error("Received error from LeaderRetrievalService.", exception);
-
- try {
- // stop associated webMonitor
- webMonitor.stop();
- }
- catch (Exception e) {
- LOG.error("Error while stopping the web server due to a LeaderRetrievalService error.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 4777202..35d13dd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -18,9 +18,11 @@
package org.apache.flink.runtime.webmonitor;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
@@ -43,9 +45,7 @@ import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
-
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -72,8 +72,8 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
WebMonitorConfig cfg,
RequestHandler handler,
JobManagerRetriever retriever,
- Future<String> localJobManagerAddressFuture,
- FiniteDuration timeout,
+ CompletableFuture<String> localJobManagerAddressFuture,
+ Time timeout,
boolean httpsEnabled) {
super(retriever, localJobManagerAddressFuture, timeout, httpsEnabled);
@@ -87,7 +87,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
}
@Override
- protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager) {
+ protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway) {
FullHttpResponse response;
try {
@@ -106,7 +106,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
queryParams.put(WEB_MONITOR_ADDRESS_KEY,
(httpsEnabled ? "https://" : "http://") + address.getHostName() + ":" + address.getPort());
- response = handler.handleRequest(pathParams, queryParams, jobManager);
+ response = handler.handleRequest(pathParams, queryParams, jobManagerGateway);
}
catch (NotFoundException e) {
// this should result in a 404 error code (not found)
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
index d524632..4cb55f1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
@@ -18,9 +18,11 @@
package org.apache.flink.runtime.webmonitor;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
@@ -29,11 +31,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -48,9 +48,9 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
private final JobManagerRetriever retriever;
- protected final Future<String> localJobManagerAddressFuture;
+ protected final CompletableFuture<String> localJobManagerAddressFuture;
- protected final FiniteDuration timeout;
+ protected final Time timeout;
/** Whether the web service has https enabled. */
protected final boolean httpsEnabled;
@@ -59,8 +59,8 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
public RuntimeMonitorHandlerBase(
JobManagerRetriever retriever,
- Future<String> localJobManagerAddressFuture,
- FiniteDuration timeout,
+ CompletableFuture<String> localJobManagerAddressFuture,
+ Time timeout,
boolean httpsEnabled) {
this.retriever = checkNotNull(retriever);
@@ -78,17 +78,17 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
@Override
protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
- if (localJobManagerAddressFuture.isCompleted()) {
+ if (localJobManagerAddressFuture.isDone()) {
if (localJobManagerAddress == null) {
- localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
+ localJobManagerAddress = localJobManagerAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
}
- Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
+ Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
- if (jobManager.isDefined()) {
- Tuple2<ActorGateway, Integer> gatewayPort = jobManager.get();
+ if (optJobManagerGateway.isPresent()) {
+ JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
- localJobManagerAddress, gatewayPort);
+ localJobManagerAddress, jobManagerGateway, timeout);
if (redirectAddress != null) {
HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path(),
@@ -96,7 +96,7 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
KeepAliveWrite.flush(ctx, routed.request(), redirect);
}
else {
- respondAsLeader(ctx, routed, gatewayPort._1());
+ respondAsLeader(ctx, routed, jobManagerGateway);
}
} else {
KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
@@ -106,5 +106,5 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
}
}
- protected abstract void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager);
+ protected abstract void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index e27a15f..17f02f0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.WebOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -71,12 +70,14 @@ import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
-import akka.actor.ActorSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,16 +86,11 @@ import javax.net.ssl.SSLContext;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
-
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -108,7 +104,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class WebRuntimeMonitor implements WebMonitor {
/** By default, all requests to the JobManager have a timeout of 10 seconds. */
- public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
+ public static final Time DEFAULT_REQUEST_TIMEOUT = Time.seconds(10L);
/** Logger for web frontend startup / shutdown messages. */
private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);
@@ -120,14 +116,15 @@ public class WebRuntimeMonitor implements WebMonitor {
private final LeaderRetrievalService leaderRetrievalService;
- /** LeaderRetrievalListener which stores the currently leading JobManager and its archive. */
+ /** Service which retrieves the currently leading JobManager and opens a JobManagerGateway. */
private final JobManagerRetriever retriever;
private final SSLContext serverSSLContext;
- private final Promise<String> jobManagerAddressPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
+ private final CompletableFuture<String> jobManagerAddressFuture = new CompletableFuture<>();
+
+ private final Time timeout;
- private final FiniteDuration timeout;
private final WebFrontendBootstrap netty;
private final File webRootDir;
@@ -142,7 +139,6 @@ public class WebRuntimeMonitor implements WebMonitor {
private AtomicBoolean cleanedUp = new AtomicBoolean();
- private ExecutorService executorService;
private MetricFetcher metricFetcher;
@@ -150,11 +146,15 @@ public class WebRuntimeMonitor implements WebMonitor {
Configuration config,
LeaderRetrievalService leaderRetrievalService,
BlobView blobView,
- ActorSystem actorSystem) throws IOException, InterruptedException {
+ JobManagerRetriever jobManagerRetriever,
+ MetricQueryServiceRetriever queryServiceRetriever,
+ Time timeout,
+ Executor executor) throws IOException, InterruptedException {
this.leaderRetrievalService = checkNotNull(leaderRetrievalService);
- this.timeout = AkkaUtils.getTimeout(config);
- this.retriever = new JobManagerRetriever(this, actorSystem, AkkaUtils.getTimeout(config), timeout);
+ this.retriever = Preconditions.checkNotNull(jobManagerRetriever);
+ this.timeout = Preconditions.checkNotNull(timeout);
+
this.cfg = new WebMonitorConfig(config);
final String configuredAddress = cfg.getWebFrontendAddress();
@@ -191,7 +191,7 @@ public class WebRuntimeMonitor implements WebMonitor {
// - Back pressure stats ----------------------------------------------
- stackTraceSamples = new StackTraceSampleCoordinator(actorSystem.dispatcher(), 60000);
+ stackTraceSamples = new StackTraceSampleCoordinator(executor, 60000);
// Back pressure stats tracker config
int cleanUpInterval = config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL);
@@ -209,10 +209,6 @@ public class WebRuntimeMonitor implements WebMonitor {
// --------------------------------------------------------------------
- executorService = new ForkJoinPool();
-
- ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(executorService);
-
// Config to enable https access to the web-ui
boolean enableSSL = config.getBoolean(WebOptions.SSL_ENABLED) && SSLUtils.getSSLEnabled(config);
@@ -226,11 +222,11 @@ public class WebRuntimeMonitor implements WebMonitor {
} else {
serverSSLContext = null;
}
- metricFetcher = new MetricFetcher(actorSystem, retriever, context);
+ metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, executor, timeout);
String defaultSavepointDir = config.getString(CoreOptions.SAVEPOINT_DIRECTORY);
- JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(currentGraphs, context, defaultSavepointDir);
+ JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(currentGraphs, executor, defaultSavepointDir);
RuntimeMonitorHandler triggerHandler = handler(cancelWithSavepoint.getTriggerHandler());
RuntimeMonitorHandler inProgressHandler = handler(cancelWithSavepoint.getInProgressHandler());
@@ -274,8 +270,8 @@ public class WebRuntimeMonitor implements WebMonitor {
get(router,
new TaskManagerLogHandler(
retriever,
- context,
- jobManagerAddressPromise.future(),
+ executor,
+ jobManagerAddressFuture,
timeout,
TaskManagerLogHandler.FileMode.LOG,
config,
@@ -284,8 +280,8 @@ public class WebRuntimeMonitor implements WebMonitor {
get(router,
new TaskManagerLogHandler(
retriever,
- context,
- jobManagerAddressPromise.future(),
+ executor,
+ jobManagerAddressFuture,
timeout,
TaskManagerLogHandler.FileMode.STDOUT,
config,
@@ -296,27 +292,27 @@ public class WebRuntimeMonitor implements WebMonitor {
router
// log and stdout
.GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
- new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.logFile,
+ new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, logFiles.logFile,
enableSSL))
.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
- new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile,
+ new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, logFiles.stdOutFile,
enableSSL));
get(router, new JobManagerMetricsHandler(metricFetcher));
// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
- get(router, new JobCancellationHandler());
+ get(router, new JobCancellationHandler(timeout));
// DELETE is the preferred way of canceling a job (Rest-conform)
- delete(router, new JobCancellationHandler());
+ delete(router, new JobCancellationHandler(timeout));
get(router, triggerHandler);
get(router, inProgressHandler);
// stop a job via GET (for proper integration with YARN this has to be performed via GET)
- get(router, new JobStoppingHandler());
+ get(router, new JobStoppingHandler(timeout));
// DELETE is the preferred way of stopping a job (Rest-conform)
- delete(router, new JobStoppingHandler());
+ delete(router, new JobStoppingHandler(timeout));
int maxCachedEntries = config.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries);
@@ -351,7 +347,7 @@ public class WebRuntimeMonitor implements WebMonitor {
}
// this handler serves all the static contents
- router.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, webRootDir,
+ router.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, webRootDir,
enableSSL));
// add shutdown hook for deleting the directories and remaining temp files on shutdown
@@ -387,7 +383,7 @@ public class WebRuntimeMonitor implements WebMonitor {
* @return array of all JsonArchivists relevant for the history server
*/
public static JsonArchivist[] getJsonArchivists() {
- JsonArchivist[] archivists = new JsonArchivist[]{
+ JsonArchivist[] archivists = {
new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(),
new JobPlanHandler.JobPlanJsonArchivist(),
@@ -418,7 +414,7 @@ public class WebRuntimeMonitor implements WebMonitor {
LOG.info("Starting with JobManager {} on port {}", jobManagerAkkaUrl, getServerPort());
synchronized (startupShutdownLock) {
- jobManagerAddressPromise.success(jobManagerAkkaUrl);
+ jobManagerAddressFuture.complete(jobManagerAkkaUrl);
leaderRetrievalService.start(retriever);
long delay = backPressureStatsTracker.getCleanUpInterval();
@@ -451,8 +447,6 @@ public class WebRuntimeMonitor implements WebMonitor {
backPressureStatsTracker.shutDown();
- executorService.shutdownNow();
-
cleanup();
}
}
@@ -522,7 +516,7 @@ public class WebRuntimeMonitor implements WebMonitor {
// ------------------------------------------------------------------------
private RuntimeMonitorHandler handler(RequestHandler handler) {
- return new RuntimeMonitorHandler(cfg, handler, retriever, jobManagerAddressPromise.future(), timeout,
+ return new RuntimeMonitorHandler(cfg, handler, retriever, jobManagerAddressFuture, timeout,
serverSSLContext != null);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index be6928e..15acb00 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -26,9 +26,10 @@ package org.apache.flink.runtime.webmonitor.files;
* https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
*****************************************************************************/
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
@@ -70,13 +71,9 @@ import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.Locale;
+import java.util.Optional;
import java.util.TimeZone;
-
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
@@ -118,9 +115,9 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
private final JobManagerRetriever retriever;
- private final Future<String> localJobManagerAddressFuture;
+ private final CompletableFuture<String> localJobManagerAddressFuture;
- private final FiniteDuration timeout;
+ private final Time timeout;
/** The path in which the static documents are. */
private final File rootPath;
@@ -135,8 +132,8 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
public StaticFileServerHandler(
JobManagerRetriever retriever,
- Future<String> localJobManagerAddressPromise,
- FiniteDuration timeout,
+ CompletableFuture<String> localJobManagerAddressPromise,
+ Time timeout,
File rootPath,
boolean httpsEnabled) throws IOException {
@@ -145,8 +142,8 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
public StaticFileServerHandler(
JobManagerRetriever retriever,
- Future<String> localJobManagerAddressFuture,
- FiniteDuration timeout,
+ CompletableFuture<String> localJobManagerAddressFuture,
+ Time timeout,
File rootPath,
boolean httpsEnabled,
Logger logger) throws IOException {
@@ -165,9 +162,9 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
@Override
public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
- if (localJobManagerAddressFuture.isCompleted()) {
+ if (localJobManagerAddressFuture.isDone()) {
if (localJobManagerAddress == null) {
- localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
+ localJobManagerAddress = localJobManagerAddressFuture.get();
}
final HttpRequest request = routed.request();
@@ -183,12 +180,12 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
requestPath = "";
}
- Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
+ Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
- if (jobManager.isDefined()) {
+ if (optJobManagerGateway.isPresent()) {
// Redirect to leader if necessary
String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
- localJobManagerAddress, jobManager.get());
+ localJobManagerAddress, optJobManagerGateway.get(), timeout);
if (redirectAddress != null) {
HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index d6c17af..89108db 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.NotFoundException;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
import java.util.Map;
+import java.util.Optional;
/**
* Base class for request handlers whose response depends on an ExecutionGraph
@@ -35,11 +38,11 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
private final ExecutionGraphHolder executionGraphHolder;
public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder) {
- this.executionGraphHolder = executionGraphHolder;
+ this.executionGraphHolder = Preconditions.checkNotNull(executionGraphHolder);
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
String jidString = pathParams.get("jobid");
if (jidString == null) {
throw new RuntimeException("JobId parameter missing");
@@ -53,12 +56,17 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
throw new RuntimeException("Invalid JobID string '" + jidString + "': " + e.getMessage());
}
- AccessExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid, jobManager);
- if (eg == null) {
- throw new NotFoundException("Could not find job with id " + jid);
+ final Optional<AccessExecutionGraph> optGraph;
+
+ try {
+ optGraph = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway);
+ } catch (Exception e) {
+ throw new FlinkException("Could not retrieve ExecutionGraph for job with jobId " + jid + " from the JobManager.", e);
}
- return handleRequest(eg, pathParams);
+ final AccessExecutionGraph graph = optGraph.orElseThrow(() -> new NotFoundException("Could not find job with jobId " + jid + '.'));
+
+ return handleRequest(graph, pathParams);
}
public abstract String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception;
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
index 2b4a45f..266ffb0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -38,8 +38,8 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler {
private static final Charset ENCODING = Charset.forName("UTF-8");
@Override
- public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
- String result = handleJsonRequest(pathParams, queryParams, jobManager);
+ public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
+ String result = handleJsonRequest(pathParams, queryParams, jobManagerGateway);
byte[] bytes = result.getBytes(ENCODING);
DefaultFullHttpResponse response = new DefaultFullHttpResponse(
@@ -57,7 +57,7 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler {
*
* @param pathParams The map of REST path parameters, decoded by the router.
* @param queryParams The map of query parameters.
- * @param jobManager The JobManager actor.
+ * @param jobManagerGateway to communicate with the JobManager.
*
* @return The JSON string that is the HTTP response.
*
@@ -69,6 +69,6 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler {
public abstract String handleJsonRequest(
Map<String, String> pathParams,
Map<String, String> queryParams,
- ActorGateway jobManager) throws Exception;
+ JobManagerGateway jobManagerGateway) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
index 816ef24..4ebc4e7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -27,10 +27,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
import java.io.StringWriter;
import java.util.Map;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -46,9 +44,9 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
private static final String commitID = EnvironmentInformation.getRevisionInformation().commitId;
- private final FiniteDuration timeout;
+ private final Time timeout;
- public ClusterOverviewHandler(FiniteDuration timeout) {
+ public ClusterOverviewHandler(Time timeout) {
this.timeout = checkNotNull(timeout);
}
@@ -58,12 +56,13 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
// we need no parameters, get all requests
try {
- if (jobManager != null) {
- Future<Object> future = jobManager.ask(RequestStatusOverview.getInstance(), timeout);
- StatusOverview overview = (StatusOverview) Await.result(future, timeout);
+ if (jobManagerGateway != null) {
+ CompletableFuture<StatusOverview> overviewFuture = jobManagerGateway.requestStatusOverview(timeout);
+
+ StatusOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
index 9d0b863..778a300 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
@@ -19,18 +19,16 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
-import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
import com.fasterxml.jackson.core.JsonGenerator;
import java.io.StringWriter;
import java.util.Map;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static java.util.Objects.requireNonNull;
@@ -43,9 +41,9 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
- private final FiniteDuration timeout;
+ private final Time timeout;
- public CurrentJobIdsHandler(FiniteDuration timeout) {
+ public CurrentJobIdsHandler(Time timeout) {
this.timeout = requireNonNull(timeout);
}
@@ -55,12 +53,12 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
// we need no parameters, get all requests
try {
- if (jobManager != null) {
- Future<Object> future = jobManager.ask(RequestJobsWithIDsOverview.getInstance(), timeout);
- JobsWithIDsOverview overview = (JobsWithIDsOverview) Await.result(future, timeout);
+ if (jobManagerGateway != null) {
+ CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
+ JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index d0518c8..b324426 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -18,12 +18,12 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
@@ -35,10 +35,8 @@ import java.io.StringWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -51,13 +49,13 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
private static final String RUNNING_JOBS_REST_PATH = "/joboverview/running";
private static final String COMPLETED_JOBS_REST_PATH = "/joboverview/completed";
- private final FiniteDuration timeout;
+ private final Time timeout;
private final boolean includeRunningJobs;
private final boolean includeFinishedJobs;
public CurrentJobsOverviewHandler(
- FiniteDuration timeout,
+ Time timeout,
boolean includeRunningJobs,
boolean includeFinishedJobs) {
@@ -79,13 +77,11 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
try {
- if (jobManager != null) {
- Future<Object> future = jobManager.ask(
- new RequestJobDetails(includeRunningJobs, includeFinishedJobs), timeout);
-
- MultipleJobsDetails result = (MultipleJobsDetails) Await.result(future, timeout);
+ if (jobManagerGateway != null) {
+ CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, timeout);
+ MultipleJobsDetails result = jobDetailsFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
final long now = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index 312c890..fe1d06b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.util.EnvironmentInformation;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -55,7 +55,7 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
return this.configString;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
index 9fbafb8..e27d125 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
@@ -18,9 +18,10 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.files.MimeTypes;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -30,13 +31,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import scala.Tuple2;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -49,35 +45,26 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class HandlerRedirectUtils {
- private static final Logger LOG = LoggerFactory.getLogger(HandlerRedirectUtils.class);
-
- /** Pattern to extract the host from an remote Akka URL. */
- private static final Pattern LeaderAddressHostPattern = Pattern.compile("^.+@(.+):([0-9]+)/user/.+$");
-
public static String getRedirectAddress(
String localJobManagerAddress,
- Tuple2<ActorGateway, Integer> leader) throws Exception {
+ JobManagerGateway jobManagerGateway,
+ Time timeout) throws Exception {
- final String leaderAddress = leader._1().path();
- final int webMonitorPort = leader._2();
+ final String leaderAddress = jobManagerGateway.getAddress();
final String jobManagerName = localJobManagerAddress.substring(localJobManagerAddress.lastIndexOf("/") + 1);
if (!localJobManagerAddress.equals(leaderAddress) &&
!leaderAddress.equals(AkkaUtils.getLocalAkkaURL(jobManagerName))) {
// We are not the leader and need to redirect
- Matcher matcher = LeaderAddressHostPattern.matcher(leaderAddress);
-
- if (matcher.matches()) {
- String redirectAddress = String.format("%s:%d", matcher.group(1), webMonitorPort);
- return redirectAddress;
- }
- else {
- LOG.warn("Unexpected leader address pattern {}. Cannot extract host.", leaderAddress);
- }
- }
+ final String hostname = jobManagerGateway.getHostname();
- return null;
+ final CompletableFuture<Integer> webMonitorPortFuture = jobManagerGateway.requestWebPort(timeout);
+ final int webMonitorPort = webMonitorPortFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ return String.format("%s:%d", hostname, webMonitorPort);
+ } else {
+ return null;
+ }
}
public static HttpResponse getRedirectResponse(String redirectAddress, String path, boolean httpsEnabled) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
index 4a21fec..db55169 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import java.util.Map;
@@ -42,7 +42,7 @@ public class JarAccessDeniedHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
return ERROR_MESSAGE;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index 2572a76..73771bd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -46,7 +46,7 @@ public class JarDeleteHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
final String file = pathParams.get("jarid");
try {
File[] list = jarDir.listFiles(new FilenameFilter() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index 4dd20b1..4f9b188 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -51,7 +51,7 @@ public class JarListHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
try {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index 1b25e7f..b239160 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -18,9 +18,9 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -45,7 +45,7 @@ public class JarPlanHandler extends JarActionHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
try {
JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
JobGraph graph = getJobGraphAndClassLoader(config).f0;
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 282fea8..12ffa4f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -22,11 +22,11 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -34,8 +34,6 @@ import java.io.File;
import java.io.StringWriter;
import java.util.Map;
-import scala.concurrent.duration.FiniteDuration;
-
/**
* This handler handles requests to fetch plan for a jar.
*/
@@ -43,13 +41,13 @@ public class JarRunHandler extends JarActionHandler {
static final String JAR_RUN_REST_PATH = "/jars/:jarid/run";
- private final FiniteDuration timeout;
+ private final Time timeout;
private final Configuration clientConfig;
- public JarRunHandler(File jarDirectory, FiniteDuration timeout, Configuration clientConfig) {
+ public JarRunHandler(File jarDirectory, Time timeout, Configuration clientConfig) {
super(jarDirectory);
- this.timeout = timeout;
- this.clientConfig = clientConfig;
+ this.timeout = Preconditions.checkNotNull(timeout);
+ this.clientConfig = Preconditions.checkNotNull(clientConfig);
}
@Override
@@ -58,17 +56,17 @@ public class JarRunHandler extends JarActionHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
try {
JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(config);
try {
JobClient.submitJobDetached(
- new AkkaJobManagerGateway(jobManager),
+ jobManagerGateway,
clientConfig,
graph.f0,
- Time.milliseconds(timeout.toMillis()),
+ timeout,
graph.f1);
} catch (JobExecutionException e) {
throw new ProgramInvocationException("Failed to submit the job to the job manager", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 745a110..705c321 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import java.io.File;
import java.util.Map;
@@ -44,9 +44,9 @@ public class JarUploadHandler extends AbstractJsonRequestHandler {
@Override
public String handleJsonRequest(
- Map<String, String> pathParams,
- Map<String, String> queryParams,
- ActorGateway jobManager) throws Exception {
+ Map<String, String> pathParams,
+ Map<String, String> queryParams,
+ JobManagerGateway jobManagerGateway) throws Exception {
String tempFilePath = queryParams.get("filepath");
String filename = queryParams.get("filename");
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index d9de7d7..513dc08 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -19,8 +19,9 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import java.util.Map;
@@ -33,17 +34,23 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler {
private static final String JOB_CONCELLATION_REST_PATH = "/jobs/:jobid/cancel";
private static final String JOB_CONCELLATION_YARN_REST_PATH = "/jobs/:jobid/yarn-cancel";
+ private final Time timeout;
+
+ public JobCancellationHandler(Time timeout) {
+ this.timeout = Preconditions.checkNotNull(timeout);
+ }
+
@Override
public String[] getPaths() {
return new String[]{JOB_CONCELLATION_REST_PATH, JOB_CONCELLATION_YARN_REST_PATH};
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
try {
- JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
- if (jobManager != null) {
- jobManager.tell(new JobManagerMessages.CancelJob(jobid));
+ JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+ if (jobManagerGateway != null) {
+ jobManagerGateway.cancelJob(jobId, timeout);
return "{}";
}
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
index 7dd4a52..9b474aa 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -19,16 +19,17 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.NotFoundException;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -37,7 +38,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-import akka.dispatch.OnComplete;
import com.fasterxml.jackson.core.JsonGenerator;
import javax.annotation.Nullable;
@@ -48,10 +48,9 @@ import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
-
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -92,16 +91,16 @@ public class JobCancellationWithSavepointHandlers {
public JobCancellationWithSavepointHandlers(
ExecutionGraphHolder currentGraphs,
- ExecutionContext executionContext) {
- this(currentGraphs, executionContext, null);
+ Executor executor) {
+ this(currentGraphs, executor, null);
}
public JobCancellationWithSavepointHandlers(
ExecutionGraphHolder currentGraphs,
- ExecutionContext executionContext,
+ Executor executor,
@Nullable String defaultSavepointDirectory) {
- this.triggerHandler = new TriggerHandler(currentGraphs, executionContext);
+ this.triggerHandler = new TriggerHandler(currentGraphs, executor);
this.inProgressHandler = new InProgressHandler();
this.defaultSavepointDirectory = defaultSavepointDirectory;
}
@@ -127,11 +126,11 @@ public class JobCancellationWithSavepointHandlers {
private final ExecutionGraphHolder currentGraphs;
/** Execution context for futures. */
- private final ExecutionContext executionContext;
+ private final Executor executor;
- public TriggerHandler(ExecutionGraphHolder currentGraphs, ExecutionContext executionContext) {
+ public TriggerHandler(ExecutionGraphHolder currentGraphs, Executor executor) {
this.currentGraphs = checkNotNull(currentGraphs);
- this.executionContext = checkNotNull(executionContext);
+ this.executor = checkNotNull(executor);
}
@Override
@@ -144,35 +143,40 @@ public class JobCancellationWithSavepointHandlers {
public FullHttpResponse handleRequest(
Map<String, String> pathParams,
Map<String, String> queryParams,
- ActorGateway jobManager) throws Exception {
+ JobManagerGateway jobManagerGateway) throws Exception {
try {
- if (jobManager != null) {
+ if (jobManagerGateway != null) {
JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
+ final Optional<AccessExecutionGraph> optGraph;
- AccessExecutionGraph graph = currentGraphs.getExecutionGraph(jobId, jobManager);
- if (graph == null) {
- throw new Exception("Cannot find ExecutionGraph for job.");
- } else {
- CheckpointCoordinator coord = graph.getCheckpointCoordinator();
- if (coord == null) {
- throw new Exception("Cannot find CheckpointCoordinator for job.");
- }
+ try {
+ optGraph = currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
+ } catch (Exception e) {
+ throw new FlinkException("Could not retrieve the execution with jobId " + jobId + " from the JobManager.", e);
+ }
- String targetDirectory = pathParams.get("targetDirectory");
- if (targetDirectory == null) {
- if (defaultSavepointDirectory == null) {
- throw new IllegalStateException("No savepoint directory configured. " +
- "You can either specify a directory when triggering this savepoint or " +
- "configure a cluster-wide default via key '" +
- CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
- } else {
- targetDirectory = defaultSavepointDirectory;
- }
- }
+ final AccessExecutionGraph graph = optGraph.orElseThrow(
+ () -> new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.'));
- return handleNewRequest(jobManager, jobId, targetDirectory, coord.getCheckpointTimeout());
+ CheckpointCoordinator coord = graph.getCheckpointCoordinator();
+ if (coord == null) {
+ throw new Exception("Cannot find CheckpointCoordinator for job.");
}
+
+ String targetDirectory = pathParams.get("targetDirectory");
+ if (targetDirectory == null) {
+ if (defaultSavepointDirectory == null) {
+ throw new IllegalStateException("No savepoint directory configured. " +
+ "You can either specify a directory when triggering this savepoint or " +
+ "configure a cluster-wide default via key '" +
+ CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
+ } else {
+ targetDirectory = defaultSavepointDirectory;
+ }
+ }
+
+ return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
} else {
throw new Exception("No connection to the leading JobManager.");
}
@@ -182,7 +186,7 @@ public class JobCancellationWithSavepointHandlers {
}
@SuppressWarnings("unchecked")
- private FullHttpResponse handleNewRequest(ActorGateway jobManager, final JobID jobId, String targetDirectory, long checkpointTimeout) throws IOException {
+ private FullHttpResponse handleNewRequest(JobManagerGateway jobManagerGateway, final JobID jobId, String targetDirectory, long checkpointTimeout) throws IOException {
// Check whether a request exists
final long requestId;
final boolean isNewRequest;
@@ -202,35 +206,21 @@ public class JobCancellationWithSavepointHandlers {
try {
// Trigger cancellation
- Object msg = new CancelJobWithSavepoint(jobId, targetDirectory);
- Future<Object> cancelFuture = jobManager
- .ask(msg, FiniteDuration.apply(checkpointTimeout, "ms"));
-
- cancelFuture.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(Throwable failure, Object resp) throws Throwable {
- synchronized (lock) {
- try {
- if (resp != null) {
- if (resp.getClass() == CancellationSuccess.class) {
- String path = ((CancellationSuccess) resp).savepointPath();
- completed.put(requestId, path);
- } else if (resp.getClass() == CancellationFailure.class) {
- Throwable cause = ((CancellationFailure) resp).cause();
- completed.put(requestId, cause);
- } else {
- Throwable cause = new IllegalStateException("Unexpected CancellationResponse of type " + resp.getClass());
- completed.put(requestId, cause);
- }
- } else {
- completed.put(requestId, failure);
- }
- } finally {
- inProgress.remove(jobId);
+ CompletableFuture<String> cancelJobFuture = jobManagerGateway
+ .cancelJobWithSavepoint(jobId, targetDirectory, Time.milliseconds(checkpointTimeout));
+
+ cancelJobFuture.whenCompleteAsync(
+ (String path, Throwable throwable) -> {
+ try {
+ if (throwable != null) {
+ completed.put(requestId, throwable);
+ } else {
+ completed.put(requestId, path);
}
+ } finally {
+ inProgress.remove(jobId);
}
- }
- }, executionContext);
+ }, executor);
success = true;
} finally {
@@ -298,9 +288,9 @@ public class JobCancellationWithSavepointHandlers {
@Override
@SuppressWarnings("unchecked")
- public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
try {
- if (jobManager != null) {
+ if (jobManagerGateway != null) {
JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
long requestId = Long.parseLong(pathParams.get("requestId"));