You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/11 11:49:01 UTC
[4/4] flink git commit: [FLINK-7381] [web] Decouple WebRuntimeMonitor
from ActorGateway
[FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorGateway
This PR decouples the WebRuntimeMonitor from the ActorGateway by introducing
the JobManagerGateway interface which can have multiple implementations. This
is a preliminary step for the integration of the existing WebRuntimeMonitor
with the Flip-6 JobMaster.
Add time unit for web.timeout
This closes #4492.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f790d3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f790d3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f790d3e
Branch: refs/heads/master
Commit: 9f790d3efe5e8267da05eb97bbe07ca8a0f859fe
Parents: 00d5b62
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Aug 2 18:43:00 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Aug 11 13:48:14 2017 +0200
----------------------------------------------------------------------
docs/ops/config.md | 2 +
.../apache/flink/configuration/WebOptions.java | 7 +
.../MesosApplicationMasterRunner.java | 13 +-
.../webmonitor/ExecutionGraphHolder.java | 59 ++----
.../runtime/webmonitor/JobManagerRetriever.java | 197 ------------------
.../webmonitor/RuntimeMonitorHandler.java | 16 +-
.../webmonitor/RuntimeMonitorHandlerBase.java | 36 ++--
.../runtime/webmonitor/WebRuntimeMonitor.java | 74 ++++---
.../files/StaticFileServerHandler.java | 35 ++--
.../AbstractExecutionGraphRequestHandler.java | 22 +-
.../handlers/AbstractJsonRequestHandler.java | 10 +-
.../handlers/ClusterOverviewHandler.java | 23 +--
.../handlers/CurrentJobIdsHandler.java | 22 +-
.../handlers/CurrentJobsOverviewHandler.java | 24 +--
.../handlers/DashboardConfigHandler.java | 4 +-
.../handlers/HandlerRedirectUtils.java | 41 ++--
.../handlers/JarAccessDeniedHandler.java | 4 +-
.../webmonitor/handlers/JarDeleteHandler.java | 4 +-
.../webmonitor/handlers/JarListHandler.java | 4 +-
.../webmonitor/handlers/JarPlanHandler.java | 4 +-
.../webmonitor/handlers/JarRunHandler.java | 20 +-
.../webmonitor/handlers/JarUploadHandler.java | 8 +-
.../handlers/JobCancellationHandler.java | 19 +-
.../JobCancellationWithSavepointHandlers.java | 124 ++++++------
.../handlers/JobManagerConfigHandler.java | 4 +-
.../webmonitor/handlers/JobStoppingHandler.java | 19 +-
.../webmonitor/handlers/RequestHandler.java | 9 +-
.../handlers/TaskManagerLogHandler.java | 66 +++---
.../handlers/TaskManagersHandler.java | 39 ++--
.../CheckpointStatsDetailsSubtasksHandler.java | 6 +-
.../metrics/AbstractMetricsHandler.java | 4 +-
.../webmonitor/metrics/MetricFetcher.java | 202 +++++++++----------
.../runtime/webmonitor/WebFrontendITCase.java | 27 +--
.../webmonitor/WebRuntimeMonitorITCase.java | 73 ++++---
.../handlers/ClusterOverviewHandlerTest.java | 8 +-
.../handlers/CurrentJobIdsHandlerTest.java | 8 +-
.../CurrentJobsOverviewHandlerTest.java | 10 +-
.../handlers/HandlerRedirectUtilsTest.java | 39 ++--
.../webmonitor/handlers/JarRunHandlerTest.java | 5 +-
.../handlers/JobAccumulatorsHandlerTest.java | 5 +-
.../handlers/JobCancellationHandlerTest.java | 4 +-
...obCancellationWithSavepointHandlersTest.java | 82 ++++----
.../handlers/JobConfigHandlerTest.java | 5 +-
.../handlers/JobDetailsHandlerTest.java | 5 +-
.../handlers/JobExceptionsHandlerTest.java | 5 +-
.../webmonitor/handlers/JobPlanHandlerTest.java | 5 +-
.../handlers/JobStoppingHandlerTest.java | 7 +-
.../JobVertexAccumulatorsHandlerTest.java | 5 +-
.../JobVertexBackPressureHandlerTest.java | 2 +-
.../handlers/JobVertexDetailsHandlerTest.java | 5 +-
.../JobVertexTaskManagersHandlerTest.java | 5 +-
...SubtaskCurrentAttemptDetailsHandlerTest.java | 6 +-
...ExecutionAttemptAccumulatorsHandlerTest.java | 5 +-
...btaskExecutionAttemptDetailsHandlerTest.java | 5 +-
.../SubtasksAllAccumulatorsHandlerTest.java | 5 +-
.../handlers/SubtasksTimesHandlerTest.java | 5 +-
.../handlers/TaskManagerLogHandlerTest.java | 54 ++---
.../handlers/TaskManagersHandlerTest.java | 7 +-
.../metrics/AbstractMetricsHandlerTest.java | 26 ++-
.../metrics/JobManagerMetricsHandlerTest.java | 20 +-
.../metrics/JobMetricsHandlerTest.java | 20 +-
.../metrics/JobVertexMetricsHandlerTest.java | 20 +-
.../webmonitor/metrics/MetricFetcherTest.java | 111 ++++------
.../metrics/TaskManagerMetricsHandlerTest.java | 20 +-
.../runtime/akka/AkkaJobManagerGateway.java | 190 +++++++++++++++--
.../apache/flink/runtime/client/JobClient.java | 6 +-
.../clusterframework/BootstrapTools.java | 34 ++--
.../flink/runtime/concurrent/FutureUtils.java | 10 +
.../runtime/jobmaster/JobManagerGateway.java | 122 ++++++++++-
.../metrics/dump/MetricDumpSerialization.java | 1 +
.../runtime/webmonitor/WebMonitorUtils.java | 29 ++-
.../retriever/JobManagerRetriever.java | 123 +++++++++++
.../retriever/MetricQueryServiceGateway.java | 36 ++++
.../retriever/MetricQueryServiceRetriever.java | 35 ++++
.../retriever/impl/AkkaJobManagerRetriever.java | 69 +++++++
.../retriever/impl/AkkaQueryServiceGateway.java | 53 +++++
.../impl/AkkaQueryServiceRetriever.java | 51 +++++
.../retriever/impl/RpcJobManagerRetriever.java | 46 +++++
.../flink/runtime/jobmanager/JobManager.scala | 10 +-
.../runtime/minicluster/FlinkMiniCluster.scala | 10 +-
.../runtime/testingUtils/TestingUtils.scala | 2 +
.../flink/yarn/YarnApplicationMasterRunner.java | 12 +-
82 files changed, 1551 insertions(+), 1018 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index c8d5c92..4138b4d 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -389,6 +389,8 @@ These parameters allow for advanced tuning. The default values are sufficient wh
- `web.access-control-allow-origin`: Enable custom access control parameter for allow origin header, default is `*`.
+- `web.timeout`: Timeout for asynchronous operation executed by the web frontend in milliseconds (DEFAULT: `10000`, 10 s)
+
### File Systems
The parameters define the behavior of tasks that create result files.
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
index f499045..3733244 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
@@ -149,6 +149,13 @@ public class WebOptions {
.defaultValue(50)
.withDeprecatedKeys("jobmanager.web.backpressure.delay-between-samples");
+ /**
+ * Timeout for asynchronous operations by the WebRuntimeMonitor in milliseconds.
+ */
+ public static final ConfigOption<Long> TIMEOUT = ConfigOptions
+ .key("web.timeout")
+ .defaultValue(10L * 1000L);
+
private WebOptions() {
throw new IllegalAccessError();
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 260b7f3..7891386 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -18,10 +18,12 @@
package org.apache.flink.mesos.runtime.clusterframework;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
@@ -52,6 +54,8 @@ import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
@@ -319,11 +323,16 @@ public class MesosApplicationMasterRunner {
// 2: the web monitor
LOG.debug("Starting Web Frontend");
+ Time webMonitorTimeout = Time.milliseconds(config.getLong(WebOptions.TIMEOUT));
+
webMonitor = BootstrapTools.startWebMonitorIfConfigured(
config,
highAvailabilityServices,
- actorSystem,
- jobManager,
+ new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout),
+ new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout),
+ webMonitorTimeout,
+ futureExecutor,
+ AkkaUtils.getAkkaURL(actorSystem, jobManager),
LOG);
if (webMonitor != null) {
final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(), "/");
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 75b0475..739b375 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -19,20 +19,19 @@
package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Optional;
import java.util.WeakHashMap;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -48,7 +47,7 @@ public class ExecutionGraphHolder {
private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class);
- private final FiniteDuration timeout;
+ private final Time timeout;
private final WeakHashMap<JobID, AccessExecutionGraph> cache = new WeakHashMap<>();
@@ -56,50 +55,36 @@ public class ExecutionGraphHolder {
this(WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
}
- public ExecutionGraphHolder(FiniteDuration timeout) {
+ public ExecutionGraphHolder(Time timeout) {
this.timeout = checkNotNull(timeout);
}
/**
- * Retrieves the execution graph with {@link JobID} jid or null if it cannot be found.
+ * Retrieves the execution graph with {@link JobID} jid wrapped in {@link Optional} or
+ * {@link Optional#empty()} if it cannot be found.
*
* @param jid jobID of the execution graph to be retrieved
- * @return the retrieved execution graph or null if it is not retrievable
+ * @return Optional ExecutionGraph if it has been retrievable, empty if there has been no ExecutionGraph
+ * @throws Exception if the ExecutionGraph retrieval failed.
*/
- public AccessExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) {
+ public Optional<AccessExecutionGraph> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) throws Exception {
AccessExecutionGraph cached = cache.get(jid);
if (cached != null) {
if (cached.getState() == JobStatus.SUSPENDED) {
cache.remove(jid);
} else {
- return cached;
+ return Optional.of(cached);
}
}
- try {
- if (jobManager != null) {
- Future<Object> future = jobManager.ask(new JobManagerMessages.RequestJob(jid), timeout);
- Object result = Await.result(future, timeout);
-
- if (result instanceof JobManagerMessages.JobNotFound) {
- return null;
- }
- else if (result instanceof JobManagerMessages.JobFound) {
- AccessExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
- cache.put(jid, eg);
- return eg;
- }
- else {
- throw new RuntimeException("Unknown response from JobManager / Archive: " + result);
- }
- }
- else {
- LOG.warn("No connection to the leading JobManager.");
- return null;
- }
- }
- catch (Exception e) {
- throw new RuntimeException("Error requesting execution graph", e);
- }
+ CompletableFuture<Optional<AccessExecutionGraph>> executionGraphFuture = jobManagerGateway.requestJob(jid, timeout);
+
+ Optional<AccessExecutionGraph> result = executionGraphFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ return result.map((executionGraph) -> {
+ cache.put(jid, executionGraph);
+
+ return executionGraph;
+ });
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
deleted file mode 100644
index 175a4b8..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.ResponseWebMonitorPort;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-import akka.dispatch.OnComplete;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.UUID;
-import java.util.concurrent.TimeoutException;
-
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Retrieves and stores the actor gateway to the current leading JobManager. In case of an error,
- * the {@link WebRuntimeMonitor} to which this instance is associated will be stopped.
- *
- * <p>The job manager gateway only works if the web monitor and the job manager run in the same
- * actor system, because many execution graph structures are not serializable. This breaks the nice
- * leader retrieval abstraction and we have a special code path in case that another job manager is
- * leader (see {@link org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils}. In such a
- * case, we get the address of the web monitor of the leading job manager and redirect to it
- * (instead of directly communicating with it).
- */
-public class JobManagerRetriever implements LeaderRetrievalListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(JobManagerRetriever.class);
-
- private final Object waitLock = new Object();
-
- private final WebMonitor webMonitor;
- private final ActorSystem actorSystem;
- private final FiniteDuration lookupTimeout;
- private final FiniteDuration timeout;
-
- private volatile Future<Tuple2<ActorGateway, Integer>> leaderGatewayPortFuture;
-
- public JobManagerRetriever(
- WebMonitor webMonitor,
- ActorSystem actorSystem,
- FiniteDuration lookupTimeout,
- FiniteDuration timeout) {
-
- this.webMonitor = checkNotNull(webMonitor);
- this.actorSystem = checkNotNull(actorSystem);
- this.lookupTimeout = checkNotNull(lookupTimeout);
- this.timeout = checkNotNull(timeout);
- }
-
- /**
- * Returns the currently known leading job manager gateway and its web monitor port.
- */
- public Option<Tuple2<ActorGateway, Integer>> getJobManagerGatewayAndWebPort() throws Exception {
- if (leaderGatewayPortFuture != null) {
- Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture = leaderGatewayPortFuture;
-
- if (gatewayPortFuture.isCompleted()) {
- Tuple2<ActorGateway, Integer> gatewayPort = Await.result(gatewayPortFuture, timeout);
-
- return Option.apply(gatewayPort);
- } else {
- return Option.empty();
- }
- } else {
- return Option.empty();
- }
- }
-
- /**
- * Awaits the leading job manager gateway and its web monitor port.
- */
- public Tuple2<ActorGateway, Integer> awaitJobManagerGatewayAndWebPort() throws Exception {
- Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture = null;
- Deadline deadline = timeout.fromNow();
-
- while (!deadline.isOverdue()) {
- synchronized (waitLock) {
- gatewayPortFuture = leaderGatewayPortFuture;
-
- if (gatewayPortFuture != null) {
- break;
- }
-
- waitLock.wait(deadline.timeLeft().toMillis());
- }
- }
-
- if (gatewayPortFuture == null) {
- throw new TimeoutException("There is no JobManager available.");
- } else {
- return Await.result(gatewayPortFuture, deadline.timeLeft());
- }
- }
-
- @Override
- public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
- if (leaderAddress != null && !leaderAddress.equals("")) {
- try {
- final Promise<Tuple2<ActorGateway, Integer>> leaderGatewayPortPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
- synchronized (waitLock) {
- leaderGatewayPortFuture = leaderGatewayPortPromise.future();
- waitLock.notifyAll();
- }
-
- LOG.info("New leader reachable under {}:{}.", leaderAddress, leaderSessionID);
-
- AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, lookupTimeout)
- // Resolve the actor ref
- .flatMap(new Mapper<ActorRef, Future<Tuple2<ActorGateway, Object>>>() {
- @Override
- public Future<Tuple2<ActorGateway, Object>> apply(ActorRef jobManagerRef) {
- ActorGateway leaderGateway = new AkkaActorGateway(
- jobManagerRef, leaderSessionID);
-
- Future<Object> webMonitorPort = leaderGateway.ask(
- JobManagerMessages.getRequestWebMonitorPort(),
- timeout);
-
- return Futures.successful(leaderGateway).zip(webMonitorPort);
- }
- }, actorSystem.dispatcher())
- // Request the web monitor port
- .onComplete(new OnComplete<Tuple2<ActorGateway, Object>>() {
- @Override
- public void onComplete(Throwable failure, Tuple2<ActorGateway, Object> success) throws Throwable {
- if (failure == null) {
- if (success._2() instanceof ResponseWebMonitorPort) {
- int webMonitorPort = ((ResponseWebMonitorPort) success._2()).port();
-
- leaderGatewayPortPromise.success(new Tuple2<>(success._1(), webMonitorPort));
- } else {
- leaderGatewayPortPromise.failure(new Exception("Received the message " +
- success._2() + " as response to " + JobManagerMessages.getRequestWebMonitorPort() +
- ". But a message of type " + ResponseWebMonitorPort.class + " was expected."));
- }
- } else {
- LOG.warn("Failed to retrieve leader gateway and port.", failure);
- leaderGatewayPortPromise.failure(failure);
- }
- }
- }, actorSystem.dispatcher());
- }
- catch (Exception e) {
- handleError(e);
- }
- }
- }
-
- @Override
- public void handleError(Exception exception) {
- LOG.error("Received error from LeaderRetrievalService.", exception);
-
- try {
- // stop associated webMonitor
- webMonitor.stop();
- }
- catch (Exception e) {
- LOG.error("Error while stopping the web server due to a LeaderRetrievalService error.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 4777202..35d13dd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -18,9 +18,11 @@
package org.apache.flink.runtime.webmonitor;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
@@ -43,9 +45,7 @@ import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
-
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -72,8 +72,8 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
WebMonitorConfig cfg,
RequestHandler handler,
JobManagerRetriever retriever,
- Future<String> localJobManagerAddressFuture,
- FiniteDuration timeout,
+ CompletableFuture<String> localJobManagerAddressFuture,
+ Time timeout,
boolean httpsEnabled) {
super(retriever, localJobManagerAddressFuture, timeout, httpsEnabled);
@@ -87,7 +87,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
}
@Override
- protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager) {
+ protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway) {
FullHttpResponse response;
try {
@@ -106,7 +106,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
queryParams.put(WEB_MONITOR_ADDRESS_KEY,
(httpsEnabled ? "https://" : "http://") + address.getHostName() + ":" + address.getPort());
- response = handler.handleRequest(pathParams, queryParams, jobManager);
+ response = handler.handleRequest(pathParams, queryParams, jobManagerGateway);
}
catch (NotFoundException e) {
// this should result in a 404 error code (not found)
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
index d524632..4cb55f1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
@@ -18,9 +18,11 @@
package org.apache.flink.runtime.webmonitor;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
@@ -29,11 +31,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -48,9 +48,9 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
private final JobManagerRetriever retriever;
- protected final Future<String> localJobManagerAddressFuture;
+ protected final CompletableFuture<String> localJobManagerAddressFuture;
- protected final FiniteDuration timeout;
+ protected final Time timeout;
/** Whether the web service has https enabled. */
protected final boolean httpsEnabled;
@@ -59,8 +59,8 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
public RuntimeMonitorHandlerBase(
JobManagerRetriever retriever,
- Future<String> localJobManagerAddressFuture,
- FiniteDuration timeout,
+ CompletableFuture<String> localJobManagerAddressFuture,
+ Time timeout,
boolean httpsEnabled) {
this.retriever = checkNotNull(retriever);
@@ -78,17 +78,17 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
@Override
protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
- if (localJobManagerAddressFuture.isCompleted()) {
+ if (localJobManagerAddressFuture.isDone()) {
if (localJobManagerAddress == null) {
- localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
+ localJobManagerAddress = localJobManagerAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
}
- Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
+ Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
- if (jobManager.isDefined()) {
- Tuple2<ActorGateway, Integer> gatewayPort = jobManager.get();
+ if (optJobManagerGateway.isPresent()) {
+ JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
- localJobManagerAddress, gatewayPort);
+ localJobManagerAddress, jobManagerGateway, timeout);
if (redirectAddress != null) {
HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path(),
@@ -96,7 +96,7 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
KeepAliveWrite.flush(ctx, routed.request(), redirect);
}
else {
- respondAsLeader(ctx, routed, gatewayPort._1());
+ respondAsLeader(ctx, routed, jobManagerGateway);
}
} else {
KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
@@ -106,5 +106,5 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
}
}
- protected abstract void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager);
+ protected abstract void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index e27a15f..17f02f0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.WebOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -71,12 +70,14 @@ import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
-import akka.actor.ActorSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,16 +86,11 @@ import javax.net.ssl.SSLContext;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
-
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -108,7 +104,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class WebRuntimeMonitor implements WebMonitor {
/** By default, all requests to the JobManager have a timeout of 10 seconds. */
- public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
+ public static final Time DEFAULT_REQUEST_TIMEOUT = Time.seconds(10L);
/** Logger for web frontend startup / shutdown messages. */
private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);
@@ -120,14 +116,15 @@ public class WebRuntimeMonitor implements WebMonitor {
private final LeaderRetrievalService leaderRetrievalService;
- /** LeaderRetrievalListener which stores the currently leading JobManager and its archive. */
+ /** Service which retrieves the currently leading JobManager and opens a JobManagerGateway. */
private final JobManagerRetriever retriever;
private final SSLContext serverSSLContext;
- private final Promise<String> jobManagerAddressPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
+ private final CompletableFuture<String> jobManagerAddressFuture = new CompletableFuture<>();
+
+ private final Time timeout;
- private final FiniteDuration timeout;
private final WebFrontendBootstrap netty;
private final File webRootDir;
@@ -142,7 +139,6 @@ public class WebRuntimeMonitor implements WebMonitor {
private AtomicBoolean cleanedUp = new AtomicBoolean();
- private ExecutorService executorService;
private MetricFetcher metricFetcher;
@@ -150,11 +146,15 @@ public class WebRuntimeMonitor implements WebMonitor {
Configuration config,
LeaderRetrievalService leaderRetrievalService,
BlobView blobView,
- ActorSystem actorSystem) throws IOException, InterruptedException {
+ JobManagerRetriever jobManagerRetriever,
+ MetricQueryServiceRetriever queryServiceRetriever,
+ Time timeout,
+ Executor executor) throws IOException, InterruptedException {
this.leaderRetrievalService = checkNotNull(leaderRetrievalService);
- this.timeout = AkkaUtils.getTimeout(config);
- this.retriever = new JobManagerRetriever(this, actorSystem, AkkaUtils.getTimeout(config), timeout);
+ this.retriever = Preconditions.checkNotNull(jobManagerRetriever);
+ this.timeout = Preconditions.checkNotNull(timeout);
+
this.cfg = new WebMonitorConfig(config);
final String configuredAddress = cfg.getWebFrontendAddress();
@@ -191,7 +191,7 @@ public class WebRuntimeMonitor implements WebMonitor {
// - Back pressure stats ----------------------------------------------
- stackTraceSamples = new StackTraceSampleCoordinator(actorSystem.dispatcher(), 60000);
+ stackTraceSamples = new StackTraceSampleCoordinator(executor, 60000);
// Back pressure stats tracker config
int cleanUpInterval = config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL);
@@ -209,10 +209,6 @@ public class WebRuntimeMonitor implements WebMonitor {
// --------------------------------------------------------------------
- executorService = new ForkJoinPool();
-
- ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(executorService);
-
// Config to enable https access to the web-ui
boolean enableSSL = config.getBoolean(WebOptions.SSL_ENABLED) && SSLUtils.getSSLEnabled(config);
@@ -226,11 +222,11 @@ public class WebRuntimeMonitor implements WebMonitor {
} else {
serverSSLContext = null;
}
- metricFetcher = new MetricFetcher(actorSystem, retriever, context);
+ metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, executor, timeout);
String defaultSavepointDir = config.getString(CoreOptions.SAVEPOINT_DIRECTORY);
- JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(currentGraphs, context, defaultSavepointDir);
+ JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(currentGraphs, executor, defaultSavepointDir);
RuntimeMonitorHandler triggerHandler = handler(cancelWithSavepoint.getTriggerHandler());
RuntimeMonitorHandler inProgressHandler = handler(cancelWithSavepoint.getInProgressHandler());
@@ -274,8 +270,8 @@ public class WebRuntimeMonitor implements WebMonitor {
get(router,
new TaskManagerLogHandler(
retriever,
- context,
- jobManagerAddressPromise.future(),
+ executor,
+ jobManagerAddressFuture,
timeout,
TaskManagerLogHandler.FileMode.LOG,
config,
@@ -284,8 +280,8 @@ public class WebRuntimeMonitor implements WebMonitor {
get(router,
new TaskManagerLogHandler(
retriever,
- context,
- jobManagerAddressPromise.future(),
+ executor,
+ jobManagerAddressFuture,
timeout,
TaskManagerLogHandler.FileMode.STDOUT,
config,
@@ -296,27 +292,27 @@ public class WebRuntimeMonitor implements WebMonitor {
router
// log and stdout
.GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
- new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.logFile,
+ new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, logFiles.logFile,
enableSSL))
.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
- new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile,
+ new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, logFiles.stdOutFile,
enableSSL));
get(router, new JobManagerMetricsHandler(metricFetcher));
// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
- get(router, new JobCancellationHandler());
+ get(router, new JobCancellationHandler(timeout));
// DELETE is the preferred way of canceling a job (Rest-conform)
- delete(router, new JobCancellationHandler());
+ delete(router, new JobCancellationHandler(timeout));
get(router, triggerHandler);
get(router, inProgressHandler);
// stop a job via GET (for proper integration with YARN this has to be performed via GET)
- get(router, new JobStoppingHandler());
+ get(router, new JobStoppingHandler(timeout));
// DELETE is the preferred way of stopping a job (Rest-conform)
- delete(router, new JobStoppingHandler());
+ delete(router, new JobStoppingHandler(timeout));
int maxCachedEntries = config.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries);
@@ -351,7 +347,7 @@ public class WebRuntimeMonitor implements WebMonitor {
}
// this handler serves all the static contents
- router.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, webRootDir,
+ router.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, webRootDir,
enableSSL));
// add shutdown hook for deleting the directories and remaining temp files on shutdown
@@ -387,7 +383,7 @@ public class WebRuntimeMonitor implements WebMonitor {
* @return array of all JsonArchivists relevant for the history server
*/
public static JsonArchivist[] getJsonArchivists() {
- JsonArchivist[] archivists = new JsonArchivist[]{
+ JsonArchivist[] archivists = {
new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(),
new JobPlanHandler.JobPlanJsonArchivist(),
@@ -418,7 +414,7 @@ public class WebRuntimeMonitor implements WebMonitor {
LOG.info("Starting with JobManager {} on port {}", jobManagerAkkaUrl, getServerPort());
synchronized (startupShutdownLock) {
- jobManagerAddressPromise.success(jobManagerAkkaUrl);
+ jobManagerAddressFuture.complete(jobManagerAkkaUrl);
leaderRetrievalService.start(retriever);
long delay = backPressureStatsTracker.getCleanUpInterval();
@@ -451,8 +447,6 @@ public class WebRuntimeMonitor implements WebMonitor {
backPressureStatsTracker.shutDown();
- executorService.shutdownNow();
-
cleanup();
}
}
@@ -522,7 +516,7 @@ public class WebRuntimeMonitor implements WebMonitor {
// ------------------------------------------------------------------------
private RuntimeMonitorHandler handler(RequestHandler handler) {
- return new RuntimeMonitorHandler(cfg, handler, retriever, jobManagerAddressPromise.future(), timeout,
+ return new RuntimeMonitorHandler(cfg, handler, retriever, jobManagerAddressFuture, timeout,
serverSSLContext != null);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index be6928e..15acb00 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -26,9 +26,10 @@ package org.apache.flink.runtime.webmonitor.files;
* https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
*****************************************************************************/
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
@@ -70,13 +71,9 @@ import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.Locale;
+import java.util.Optional;
import java.util.TimeZone;
-
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
@@ -118,9 +115,9 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
private final JobManagerRetriever retriever;
- private final Future<String> localJobManagerAddressFuture;
+ private final CompletableFuture<String> localJobManagerAddressFuture;
- private final FiniteDuration timeout;
+ private final Time timeout;
/** The path in which the static documents are. */
private final File rootPath;
@@ -135,8 +132,8 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
public StaticFileServerHandler(
JobManagerRetriever retriever,
- Future<String> localJobManagerAddressPromise,
- FiniteDuration timeout,
+ CompletableFuture<String> localJobManagerAddressPromise,
+ Time timeout,
File rootPath,
boolean httpsEnabled) throws IOException {
@@ -145,8 +142,8 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
public StaticFileServerHandler(
JobManagerRetriever retriever,
- Future<String> localJobManagerAddressFuture,
- FiniteDuration timeout,
+ CompletableFuture<String> localJobManagerAddressFuture,
+ Time timeout,
File rootPath,
boolean httpsEnabled,
Logger logger) throws IOException {
@@ -165,9 +162,9 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
@Override
public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
- if (localJobManagerAddressFuture.isCompleted()) {
+ if (localJobManagerAddressFuture.isDone()) {
if (localJobManagerAddress == null) {
- localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
+ localJobManagerAddress = localJobManagerAddressFuture.get();
}
final HttpRequest request = routed.request();
@@ -183,12 +180,12 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
requestPath = "";
}
- Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
+ Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
- if (jobManager.isDefined()) {
+ if (optJobManagerGateway.isPresent()) {
// Redirect to leader if necessary
String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
- localJobManagerAddress, jobManager.get());
+ localJobManagerAddress, optJobManagerGateway.get(), timeout);
if (redirectAddress != null) {
HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index d6c17af..89108db 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.NotFoundException;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
import java.util.Map;
+import java.util.Optional;
/**
* Base class for request handlers whose response depends on an ExecutionGraph
@@ -35,11 +38,11 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
private final ExecutionGraphHolder executionGraphHolder;
public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder) {
- this.executionGraphHolder = executionGraphHolder;
+ this.executionGraphHolder = Preconditions.checkNotNull(executionGraphHolder);
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
String jidString = pathParams.get("jobid");
if (jidString == null) {
throw new RuntimeException("JobId parameter missing");
@@ -53,12 +56,17 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
throw new RuntimeException("Invalid JobID string '" + jidString + "': " + e.getMessage());
}
- AccessExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid, jobManager);
- if (eg == null) {
- throw new NotFoundException("Could not find job with id " + jid);
+ final Optional<AccessExecutionGraph> optGraph;
+
+ try {
+ optGraph = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway);
+ } catch (Exception e) {
+ throw new FlinkException("Could not retrieve ExecutionGraph for job with jobId " + jid + " from the JobManager.", e);
}
- return handleRequest(eg, pathParams);
+ final AccessExecutionGraph graph = optGraph.orElseThrow(() -> new NotFoundException("Could not find job with jobId " + jid + '.'));
+
+ return handleRequest(graph, pathParams);
}
public abstract String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception;
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
index 2b4a45f..266ffb0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -38,8 +38,8 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler {
private static final Charset ENCODING = Charset.forName("UTF-8");
@Override
- public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
- String result = handleJsonRequest(pathParams, queryParams, jobManager);
+ public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
+ String result = handleJsonRequest(pathParams, queryParams, jobManagerGateway);
byte[] bytes = result.getBytes(ENCODING);
DefaultFullHttpResponse response = new DefaultFullHttpResponse(
@@ -57,7 +57,7 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler {
*
* @param pathParams The map of REST path parameters, decoded by the router.
* @param queryParams The map of query parameters.
- * @param jobManager The JobManager actor.
+ * @param jobManagerGateway to communicate with the JobManager.
*
* @return The JSON string that is the HTTP response.
*
@@ -69,6 +69,6 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler {
public abstract String handleJsonRequest(
Map<String, String> pathParams,
Map<String, String> queryParams,
- ActorGateway jobManager) throws Exception;
+ JobManagerGateway jobManagerGateway) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
index 816ef24..4ebc4e7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -27,10 +27,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
import java.io.StringWriter;
import java.util.Map;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -46,9 +44,9 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
private static final String commitID = EnvironmentInformation.getRevisionInformation().commitId;
- private final FiniteDuration timeout;
+ private final Time timeout;
- public ClusterOverviewHandler(FiniteDuration timeout) {
+ public ClusterOverviewHandler(Time timeout) {
this.timeout = checkNotNull(timeout);
}
@@ -58,12 +56,13 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
// we need no parameters, get all requests
try {
- if (jobManager != null) {
- Future<Object> future = jobManager.ask(RequestStatusOverview.getInstance(), timeout);
- StatusOverview overview = (StatusOverview) Await.result(future, timeout);
+ if (jobManagerGateway != null) {
+ CompletableFuture<StatusOverview> overviewFuture = jobManagerGateway.requestStatusOverview(timeout);
+
+ StatusOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
index 9d0b863..778a300 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
@@ -19,18 +19,16 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
-import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
import com.fasterxml.jackson.core.JsonGenerator;
import java.io.StringWriter;
import java.util.Map;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static java.util.Objects.requireNonNull;
@@ -43,9 +41,9 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
- private final FiniteDuration timeout;
+ private final Time timeout;
- public CurrentJobIdsHandler(FiniteDuration timeout) {
+ public CurrentJobIdsHandler(Time timeout) {
this.timeout = requireNonNull(timeout);
}
@@ -55,12 +53,12 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
// we need no parameters, get all requests
try {
- if (jobManager != null) {
- Future<Object> future = jobManager.ask(RequestJobsWithIDsOverview.getInstance(), timeout);
- JobsWithIDsOverview overview = (JobsWithIDsOverview) Await.result(future, timeout);
+ if (jobManagerGateway != null) {
+ CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
+ JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index d0518c8..b324426 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -18,12 +18,12 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
@@ -35,10 +35,8 @@ import java.io.StringWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -51,13 +49,13 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
private static final String RUNNING_JOBS_REST_PATH = "/joboverview/running";
private static final String COMPLETED_JOBS_REST_PATH = "/joboverview/completed";
- private final FiniteDuration timeout;
+ private final Time timeout;
private final boolean includeRunningJobs;
private final boolean includeFinishedJobs;
public CurrentJobsOverviewHandler(
- FiniteDuration timeout,
+ Time timeout,
boolean includeRunningJobs,
boolean includeFinishedJobs) {
@@ -79,13 +77,11 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
try {
- if (jobManager != null) {
- Future<Object> future = jobManager.ask(
- new RequestJobDetails(includeRunningJobs, includeFinishedJobs), timeout);
-
- MultipleJobsDetails result = (MultipleJobsDetails) Await.result(future, timeout);
+ if (jobManagerGateway != null) {
+ CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, timeout);
+ MultipleJobsDetails result = jobDetailsFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
final long now = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index 312c890..fe1d06b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.util.EnvironmentInformation;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -55,7 +55,7 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
return this.configString;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
index 9fbafb8..e27d125 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
@@ -18,9 +18,10 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.files.MimeTypes;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -30,13 +31,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import scala.Tuple2;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -49,35 +45,26 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class HandlerRedirectUtils {
- private static final Logger LOG = LoggerFactory.getLogger(HandlerRedirectUtils.class);
-
- /** Pattern to extract the host from an remote Akka URL. */
- private static final Pattern LeaderAddressHostPattern = Pattern.compile("^.+@(.+):([0-9]+)/user/.+$");
-
public static String getRedirectAddress(
String localJobManagerAddress,
- Tuple2<ActorGateway, Integer> leader) throws Exception {
+ JobManagerGateway jobManagerGateway,
+ Time timeout) throws Exception {
- final String leaderAddress = leader._1().path();
- final int webMonitorPort = leader._2();
+ final String leaderAddress = jobManagerGateway.getAddress();
final String jobManagerName = localJobManagerAddress.substring(localJobManagerAddress.lastIndexOf("/") + 1);
if (!localJobManagerAddress.equals(leaderAddress) &&
!leaderAddress.equals(AkkaUtils.getLocalAkkaURL(jobManagerName))) {
// We are not the leader and need to redirect
- Matcher matcher = LeaderAddressHostPattern.matcher(leaderAddress);
-
- if (matcher.matches()) {
- String redirectAddress = String.format("%s:%d", matcher.group(1), webMonitorPort);
- return redirectAddress;
- }
- else {
- LOG.warn("Unexpected leader address pattern {}. Cannot extract host.", leaderAddress);
- }
- }
+ final String hostname = jobManagerGateway.getHostname();
- return null;
+ final CompletableFuture<Integer> webMonitorPortFuture = jobManagerGateway.requestWebPort(timeout);
+ final int webMonitorPort = webMonitorPortFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ return String.format("%s:%d", hostname, webMonitorPort);
+ } else {
+ return null;
+ }
}
public static HttpResponse getRedirectResponse(String redirectAddress, String path, boolean httpsEnabled) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
index 4a21fec..db55169 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import java.util.Map;
@@ -42,7 +42,7 @@ public class JarAccessDeniedHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
return ERROR_MESSAGE;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index 2572a76..73771bd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -46,7 +46,7 @@ public class JarDeleteHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
final String file = pathParams.get("jarid");
try {
File[] list = jarDir.listFiles(new FilenameFilter() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index 4dd20b1..4f9b188 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -51,7 +51,7 @@ public class JarListHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
try {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index 1b25e7f..b239160 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -18,9 +18,9 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -45,7 +45,7 @@ public class JarPlanHandler extends JarActionHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
try {
JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
JobGraph graph = getJobGraphAndClassLoader(config).f0;
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 282fea8..12ffa4f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -22,11 +22,11 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -34,8 +34,6 @@ import java.io.File;
import java.io.StringWriter;
import java.util.Map;
-import scala.concurrent.duration.FiniteDuration;
-
/**
* This handler handles requests to fetch plan for a jar.
*/
@@ -43,13 +41,13 @@ public class JarRunHandler extends JarActionHandler {
static final String JAR_RUN_REST_PATH = "/jars/:jarid/run";
- private final FiniteDuration timeout;
+ private final Time timeout;
private final Configuration clientConfig;
- public JarRunHandler(File jarDirectory, FiniteDuration timeout, Configuration clientConfig) {
+ public JarRunHandler(File jarDirectory, Time timeout, Configuration clientConfig) {
super(jarDirectory);
- this.timeout = timeout;
- this.clientConfig = clientConfig;
+ this.timeout = Preconditions.checkNotNull(timeout);
+ this.clientConfig = Preconditions.checkNotNull(clientConfig);
}
@Override
@@ -58,17 +56,17 @@ public class JarRunHandler extends JarActionHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
try {
JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(config);
try {
JobClient.submitJobDetached(
- new AkkaJobManagerGateway(jobManager),
+ jobManagerGateway,
clientConfig,
graph.f0,
- Time.milliseconds(timeout.toMillis()),
+ timeout,
graph.f1);
} catch (JobExecutionException e) {
throw new ProgramInvocationException("Failed to submit the job to the job manager", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 745a110..705c321 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import java.io.File;
import java.util.Map;
@@ -44,9 +44,9 @@ public class JarUploadHandler extends AbstractJsonRequestHandler {
@Override
public String handleJsonRequest(
- Map<String, String> pathParams,
- Map<String, String> queryParams,
- ActorGateway jobManager) throws Exception {
+ Map<String, String> pathParams,
+ Map<String, String> queryParams,
+ JobManagerGateway jobManagerGateway) throws Exception {
String tempFilePath = queryParams.get("filepath");
String filename = queryParams.get("filename");
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index d9de7d7..513dc08 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -19,8 +19,9 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import java.util.Map;
@@ -33,17 +34,23 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler {
private static final String JOB_CONCELLATION_REST_PATH = "/jobs/:jobid/cancel";
private static final String JOB_CONCELLATION_YARN_REST_PATH = "/jobs/:jobid/yarn-cancel";
+ private final Time timeout;
+
+ public JobCancellationHandler(Time timeout) {
+ this.timeout = Preconditions.checkNotNull(timeout);
+ }
+
@Override
public String[] getPaths() {
return new String[]{JOB_CONCELLATION_REST_PATH, JOB_CONCELLATION_YARN_REST_PATH};
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
try {
- JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
- if (jobManager != null) {
- jobManager.tell(new JobManagerMessages.CancelJob(jobid));
+ JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+ if (jobManagerGateway != null) {
+ jobManagerGateway.cancelJob(jobId, timeout);
return "{}";
}
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
index 7dd4a52..9b474aa 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -19,16 +19,17 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.NotFoundException;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -37,7 +38,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-import akka.dispatch.OnComplete;
import com.fasterxml.jackson.core.JsonGenerator;
import javax.annotation.Nullable;
@@ -48,10 +48,9 @@ import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
-
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -92,16 +91,16 @@ public class JobCancellationWithSavepointHandlers {
public JobCancellationWithSavepointHandlers(
ExecutionGraphHolder currentGraphs,
- ExecutionContext executionContext) {
- this(currentGraphs, executionContext, null);
+ Executor executor) {
+ this(currentGraphs, executor, null);
}
public JobCancellationWithSavepointHandlers(
ExecutionGraphHolder currentGraphs,
- ExecutionContext executionContext,
+ Executor executor,
@Nullable String defaultSavepointDirectory) {
- this.triggerHandler = new TriggerHandler(currentGraphs, executionContext);
+ this.triggerHandler = new TriggerHandler(currentGraphs, executor);
this.inProgressHandler = new InProgressHandler();
this.defaultSavepointDirectory = defaultSavepointDirectory;
}
@@ -127,11 +126,11 @@ public class JobCancellationWithSavepointHandlers {
private final ExecutionGraphHolder currentGraphs;
/** Execution context for futures. */
- private final ExecutionContext executionContext;
+ private final Executor executor;
- public TriggerHandler(ExecutionGraphHolder currentGraphs, ExecutionContext executionContext) {
+ public TriggerHandler(ExecutionGraphHolder currentGraphs, Executor executor) {
this.currentGraphs = checkNotNull(currentGraphs);
- this.executionContext = checkNotNull(executionContext);
+ this.executor = checkNotNull(executor);
}
@Override
@@ -144,35 +143,40 @@ public class JobCancellationWithSavepointHandlers {
public FullHttpResponse handleRequest(
Map<String, String> pathParams,
Map<String, String> queryParams,
- ActorGateway jobManager) throws Exception {
+ JobManagerGateway jobManagerGateway) throws Exception {
try {
- if (jobManager != null) {
+ if (jobManagerGateway != null) {
JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
+ final Optional<AccessExecutionGraph> optGraph;
- AccessExecutionGraph graph = currentGraphs.getExecutionGraph(jobId, jobManager);
- if (graph == null) {
- throw new Exception("Cannot find ExecutionGraph for job.");
- } else {
- CheckpointCoordinator coord = graph.getCheckpointCoordinator();
- if (coord == null) {
- throw new Exception("Cannot find CheckpointCoordinator for job.");
- }
+ try {
+ optGraph = currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
+ } catch (Exception e) {
+ throw new FlinkException("Could not retrieve the execution with jobId " + jobId + " from the JobManager.", e);
+ }
- String targetDirectory = pathParams.get("targetDirectory");
- if (targetDirectory == null) {
- if (defaultSavepointDirectory == null) {
- throw new IllegalStateException("No savepoint directory configured. " +
- "You can either specify a directory when triggering this savepoint or " +
- "configure a cluster-wide default via key '" +
- CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
- } else {
- targetDirectory = defaultSavepointDirectory;
- }
- }
+ final AccessExecutionGraph graph = optGraph.orElseThrow(
+ () -> new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.'));
- return handleNewRequest(jobManager, jobId, targetDirectory, coord.getCheckpointTimeout());
+ CheckpointCoordinator coord = graph.getCheckpointCoordinator();
+ if (coord == null) {
+ throw new Exception("Cannot find CheckpointCoordinator for job.");
}
+
+ String targetDirectory = pathParams.get("targetDirectory");
+ if (targetDirectory == null) {
+ if (defaultSavepointDirectory == null) {
+ throw new IllegalStateException("No savepoint directory configured. " +
+ "You can either specify a directory when triggering this savepoint or " +
+ "configure a cluster-wide default via key '" +
+ CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
+ } else {
+ targetDirectory = defaultSavepointDirectory;
+ }
+ }
+
+ return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
} else {
throw new Exception("No connection to the leading JobManager.");
}
@@ -182,7 +186,7 @@ public class JobCancellationWithSavepointHandlers {
}
@SuppressWarnings("unchecked")
- private FullHttpResponse handleNewRequest(ActorGateway jobManager, final JobID jobId, String targetDirectory, long checkpointTimeout) throws IOException {
+ private FullHttpResponse handleNewRequest(JobManagerGateway jobManagerGateway, final JobID jobId, String targetDirectory, long checkpointTimeout) throws IOException {
// Check whether a request exists
final long requestId;
final boolean isNewRequest;
@@ -202,35 +206,21 @@ public class JobCancellationWithSavepointHandlers {
try {
// Trigger cancellation
- Object msg = new CancelJobWithSavepoint(jobId, targetDirectory);
- Future<Object> cancelFuture = jobManager
- .ask(msg, FiniteDuration.apply(checkpointTimeout, "ms"));
-
- cancelFuture.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(Throwable failure, Object resp) throws Throwable {
- synchronized (lock) {
- try {
- if (resp != null) {
- if (resp.getClass() == CancellationSuccess.class) {
- String path = ((CancellationSuccess) resp).savepointPath();
- completed.put(requestId, path);
- } else if (resp.getClass() == CancellationFailure.class) {
- Throwable cause = ((CancellationFailure) resp).cause();
- completed.put(requestId, cause);
- } else {
- Throwable cause = new IllegalStateException("Unexpected CancellationResponse of type " + resp.getClass());
- completed.put(requestId, cause);
- }
- } else {
- completed.put(requestId, failure);
- }
- } finally {
- inProgress.remove(jobId);
+ CompletableFuture<String> cancelJobFuture = jobManagerGateway
+ .cancelJobWithSavepoint(jobId, targetDirectory, Time.milliseconds(checkpointTimeout));
+
+ cancelJobFuture.whenCompleteAsync(
+ (String path, Throwable throwable) -> {
+ try {
+ if (throwable != null) {
+ completed.put(requestId, throwable);
+ } else {
+ completed.put(requestId, path);
}
+ } finally {
+ inProgress.remove(jobId);
}
- }
- }, executionContext);
+ }, executor);
success = true;
} finally {
@@ -298,9 +288,9 @@ public class JobCancellationWithSavepointHandlers {
@Override
@SuppressWarnings("unchecked")
- public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+ public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
try {
- if (jobManager != null) {
+ if (jobManagerGateway != null) {
JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
long requestId = Long.parseLong(pathParams.get("requestId"));