You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/17 20:20:23 UTC
[51/51] [abbrv] flink git commit: [FLINK-2688] [monitoring api]
Integrate monitoring request handler with HA leader handling
[FLINK-2688] [monitoring api] Integrate monitoring request handler with HA leader handling
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/506ce61f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/506ce61f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/506ce61f
Branch: refs/heads/master
Commit: 506ce61f66b862037bc57c7c25ae838bd762f2ac
Parents: 8db523c
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 16 22:04:19 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 17 14:21:53 2015 +0200
----------------------------------------------------------------------
.../optimizer/plantranslate/JsonMapper.java | 8 +-
.../webmonitor/ExecutionGraphHolder.java | 56 +++++++-----
.../runtime/webmonitor/WebRuntimeMonitor.java | 43 ++++++++--
.../handlers/ClusterOverviewHandler.java | 56 ++++++------
.../handlers/CurrentJobIdsHandler.java | 90 +++++++++++---------
.../handlers/CurrentJobsOverviewHandler.java | 82 ++++++++++--------
.../runtime/messages/GenericMessageTester.java | 2 +-
7 files changed, 207 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/506ce61f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
index 4d81058..d5ddf4d 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
@@ -107,8 +107,14 @@ public class JsonMapper {
case NESTEDLOOP_STREAMED_OUTER_SECOND:
return "Nested Loops (Streamed Outer: " + secondInputName + ")";
- case MERGE:
+ case INNER_MERGE:
return "Merge";
+ case FULL_OUTER_MERGE:
+ return "Full Outer Merge";
+ case LEFT_OUTER_MERGE:
+ return "Left Outer Merge";
+ case RIGHT_OUTER_MERGE:
+ return "Right Outer Merge";
case CO_GROUP:
return "Co-Group";
http://git-wip-us.apache.org/repos/asf/flink/blob/506ce61f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 74278a1..09ede4c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -30,7 +32,7 @@ import scala.concurrent.duration.FiniteDuration;
import java.util.WeakHashMap;
/**
- * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archiver.
+ * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archive.
* <p>
* The holder will cache the ExecutionGraph behind a weak reference, which will be cleared
* at some point once no one else is pointing to the ExecutionGraph.
@@ -39,26 +41,33 @@ import java.util.WeakHashMap;
*/
public class ExecutionGraphHolder {
- private final ActorGateway source;
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class);
+
+ /** Retrieves the current leading JobManager and its corresponding archive */
+ private final JobManagerArchiveRetriever retriever;
private final FiniteDuration timeout;
private final WeakHashMap<JobID, ExecutionGraph> cache = new WeakHashMap<JobID, ExecutionGraph>();
-
- public ExecutionGraphHolder(ActorGateway source) {
- this(source, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
+ public ExecutionGraphHolder(JobManagerArchiveRetriever retriever) {
+ this(retriever, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
}
- public ExecutionGraphHolder(ActorGateway source, FiniteDuration timeout) {
- if (source == null || timeout == null) {
+ public ExecutionGraphHolder(JobManagerArchiveRetriever retriever, FiniteDuration timeout) {
+ if (retriever == null || timeout == null) {
throw new NullPointerException();
}
- this.source = source;
+ this.retriever = retriever;
this.timeout = timeout;
}
-
+ /**
+ * Retrieves the execution graph with {@link JobID} jid or null if it cannot be found.
+ *
+ * @param jid jobID of the execution graph to be retrieved
+ * @return the retrieved execution graph or null if it is not retrievable
+ */
public ExecutionGraph getExecutionGraph(JobID jid) {
ExecutionGraph cached = cache.get(jid);
if (cached != null) {
@@ -66,18 +75,27 @@ public class ExecutionGraphHolder {
}
try {
- Future<Object> future = source.ask(new JobManagerMessages.RequestJob(jid), timeout);
- Object result = Await.result(future, timeout);
- if (result instanceof JobManagerMessages.JobNotFound) {
- return null;
- }
- else if (result instanceof JobManagerMessages.JobFound) {
- ExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
- cache.put(jid, eg);
- return eg;
+ ActorGateway jobManager = retriever.getJobManagerGateway();
+
+ if (jobManager != null) {
+ Future<Object> future = jobManager.ask(new JobManagerMessages.RequestJob(jid), timeout);
+ Object result = Await.result(future, timeout);
+
+ if (result instanceof JobManagerMessages.JobNotFound) {
+ return null;
+ }
+ else if (result instanceof JobManagerMessages.JobFound) {
+ ExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
+ cache.put(jid, eg);
+ return eg;
+ }
+ else {
+ throw new RuntimeException("Unknown response from JobManager / Archive: " + result);
+ }
}
else {
- throw new RuntimeException("Unknown response from JobManager / Archive: " + result);
+ LOG.warn("No connection to the leading JobManager.");
+ return null;
}
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/506ce61f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 4633dcf..a34823e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.webmonitor;
+import akka.actor.ActorSystem;
+
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
@@ -33,7 +35,8 @@ import io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
@@ -63,6 +66,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
+import static com.google.common.base.Preconditions.checkNotNull;
+
/**
* The root component of the web runtime monitor. This class starts the web server and creates
* all request handlers for the REST API.
@@ -84,7 +89,13 @@ public class WebRuntimeMonitor implements WebMonitor {
// ------------------------------------------------------------------------
+ /** Guarding concurrent modifications to the server channel pipeline during startup and shutdown */
private final Object startupShutdownLock = new Object();
+
+ private final LeaderRetrievalService leaderRetrievalService;
+
+ /** LeaderRetrievalListener which stores the currently leading JobManager and its archive */
+ private final JobManagerArchiveRetriever retriever;
private final Router router;
@@ -95,7 +106,12 @@ public class WebRuntimeMonitor implements WebMonitor {
private Channel serverChannel;
- public WebRuntimeMonitor(Configuration config, ActorGateway jobManager, ActorGateway archive) throws IOException {
+ public WebRuntimeMonitor(
+ Configuration config,
+ LeaderRetrievalService leaderRetrievalService,
+ ActorSystem actorSystem) throws IOException
+ {
+ this.leaderRetrievalService = checkNotNull(leaderRetrievalService);
final WebMonitorConfig cfg = new WebMonitorConfig(config);
@@ -127,22 +143,27 @@ public class WebRuntimeMonitor implements WebMonitor {
if (this.configuredPort < 0) {
throw new IllegalArgumentException("Web frontend port is invalid: " + this.configuredPort);
}
+
+ FiniteDuration timeout = AkkaUtils.getTimeout(config);
+ FiniteDuration lookupTimeout = AkkaUtils.getTimeout(config);
+
+ retriever = new JobManagerArchiveRetriever(this, actorSystem, lookupTimeout, timeout);
- ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(jobManager);
-
+ ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(retriever);
+
router = new Router()
// config how to interact with this web server
.GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval())))
// the overview - how many task managers, slots, free slots, ...
- .GET("/overview", handler(new ClusterOverviewHandler(jobManager, DEFAULT_REQUEST_TIMEOUT)))
+ .GET("/overview", handler(new ClusterOverviewHandler(retriever, DEFAULT_REQUEST_TIMEOUT)))
// overview over jobs
- .GET("/joboverview", handler(new CurrentJobsOverviewHandler(jobManager, DEFAULT_REQUEST_TIMEOUT, true, true)))
- .GET("/joboverview/running", handler(new CurrentJobsOverviewHandler(jobManager, DEFAULT_REQUEST_TIMEOUT, true, false)))
- .GET("/joboverview/completed", handler(new CurrentJobsOverviewHandler(jobManager, DEFAULT_REQUEST_TIMEOUT, false, true)))
+ .GET("/joboverview", handler(new CurrentJobsOverviewHandler(retriever, DEFAULT_REQUEST_TIMEOUT, true, true)))
+ .GET("/joboverview/running", handler(new CurrentJobsOverviewHandler(retriever, DEFAULT_REQUEST_TIMEOUT, true, false)))
+ .GET("/joboverview/completed", handler(new CurrentJobsOverviewHandler(retriever, DEFAULT_REQUEST_TIMEOUT, false, true)))
- .GET("/jobs", handler(new CurrentJobIdsHandler(jobManager, DEFAULT_REQUEST_TIMEOUT)))
+ .GET("/jobs", handler(new CurrentJobIdsHandler(retriever, DEFAULT_REQUEST_TIMEOUT)))
.GET("/jobs/:jobid", handler(new JobDetailsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices", handler(new JobDetailsHandler(currentGraphs)))
@@ -203,12 +224,16 @@ public class WebRuntimeMonitor implements WebMonitor {
int port = bindAddress.getPort();
LOG.info("Web frontend listening at " + address + ':' + port);
+
+ leaderRetrievalService.start(retriever);
}
}
@Override
public void stop() throws Exception {
synchronized (startupShutdownLock) {
+ leaderRetrievalService.stop();
+
if (this.serverChannel != null) {
this.serverChannel.close().awaitUninterruptibly();
this.serverChannel = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/506ce61f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
index 2d8b217..dde368b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
+import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -35,44 +36,51 @@ import java.util.Map;
* TaskManagers are currently connected, and how many jobs are running.
*/
public class ClusterOverviewHandler implements RequestHandler, RequestHandler.JsonResponse {
-
- private final ActorGateway jobManager;
-
+
+ private final JobManagerArchiveRetriever retriever;
+
private final FiniteDuration timeout;
-
- public ClusterOverviewHandler(ActorGateway jobManager, FiniteDuration timeout) {
- if (jobManager == null || timeout == null) {
+
+ public ClusterOverviewHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout) {
+ if (retriever == null || timeout == null) {
throw new NullPointerException();
}
- this.jobManager = jobManager;
+ this.retriever = retriever;
this.timeout = timeout;
}
-
+
@Override
public String handleRequest(Map<String, String> params) throws Exception {
+ // we need no parameters, get all requests
try {
- Future<Object> future = jobManager.ask(RequestStatusOverview.getInstance(), timeout);
- StatusOverview overview = (StatusOverview) Await.result(future, timeout);
+ ActorGateway jobManager = retriever.getJobManagerGateway();
+
+ if (jobManager != null) {
+ Future<Object> future = jobManager.ask(RequestStatusOverview.getInstance(), timeout);
+ StatusOverview overview = (StatusOverview) Await.result(future, timeout);
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
- gen.writeStartObject();
- gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
- gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
- gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
- gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
- gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
- gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
- gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
- gen.writeEndObject();
+ gen.writeStartObject();
+ gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
+ gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
+ gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
+ gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
+ gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
+ gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
+ gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
+ gen.writeEndObject();
- gen.close();
- return writer.toString();
+ gen.close();
+ return writer.toString();
+ } else {
+ throw new Exception("No connection to the leading JobManager.");
+ }
}
catch (Exception e) {
- throw new Exception("Failed to fetch the status overview: " + e.getMessage(), e);
+ throw new RuntimeException("Failed to fetch list of all running jobs: " + e.getMessage(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/506ce61f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
index e7e96e2..049bd54 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
+import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -37,17 +38,17 @@ import java.util.Map;
* given the JobManager or Archive Actor Reference.
*/
public class CurrentJobIdsHandler implements RequestHandler, RequestHandler.JsonResponse {
-
- private final ActorGateway target;
-
+
+ private final JobManagerArchiveRetriever retriever;
+
private final FiniteDuration timeout;
-
-
- public CurrentJobIdsHandler(ActorGateway target, FiniteDuration timeout) {
- if (target == null || timeout == null) {
+
+
+ public CurrentJobIdsHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout) {
+ if (retriever == null || timeout == null) {
throw new NullPointerException();
}
- this.target = target;
+ this.retriever = retriever;
this.timeout = timeout;
}
@@ -55,42 +56,49 @@ public class CurrentJobIdsHandler implements RequestHandler, RequestHandler.Json
public String handleRequest(Map<String, String> params) throws Exception {
// we need no parameters, get all requests
try {
- Future<Object> future = target.ask(RequestJobsWithIDsOverview.getInstance(), timeout);
- JobsWithIDsOverview overview = (JobsWithIDsOverview) Await.result(future, timeout);
-
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
-
- gen.writeStartObject();
+ ActorGateway jobManager = retriever.getJobManagerGateway();
- gen.writeArrayFieldStart("jobs-running");
- for (JobID jid : overview.getJobsRunningOrPending()) {
- gen.writeString(jid.toString());
- }
- gen.writeEndArray();
-
- gen.writeArrayFieldStart("jobs-finished");
- for (JobID jid : overview.getJobsFinished()) {
- gen.writeString(jid.toString());
- }
- gen.writeEndArray();
-
- gen.writeArrayFieldStart("jobs-cancelled");
- for (JobID jid : overview.getJobsCancelled()) {
- gen.writeString(jid.toString());
+ if (jobManager != null) {
+ Future<Object> future = jobManager.ask(RequestJobsWithIDsOverview.getInstance(), timeout);
+ JobsWithIDsOverview overview = (JobsWithIDsOverview) Await.result(future, timeout);
+
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+
+ gen.writeStartObject();
+
+ gen.writeArrayFieldStart("jobs-running");
+ for (JobID jid : overview.getJobsRunningOrPending()) {
+ gen.writeString(jid.toString());
+ }
+ gen.writeEndArray();
+
+ gen.writeArrayFieldStart("jobs-finished");
+ for (JobID jid : overview.getJobsFinished()) {
+ gen.writeString(jid.toString());
+ }
+ gen.writeEndArray();
+
+ gen.writeArrayFieldStart("jobs-cancelled");
+ for (JobID jid : overview.getJobsCancelled()) {
+ gen.writeString(jid.toString());
+ }
+ gen.writeEndArray();
+
+ gen.writeArrayFieldStart("jobs-failed");
+ for (JobID jid : overview.getJobsFailed()) {
+ gen.writeString(jid.toString());
+ }
+ gen.writeEndArray();
+
+ gen.writeEndObject();
+
+ gen.close();
+ return writer.toString();
}
- gen.writeEndArray();
-
- gen.writeArrayFieldStart("jobs-failed");
- for (JobID jid : overview.getJobsFailed()) {
- gen.writeString(jid.toString());
+ else {
+ throw new Exception("No connection to the leading JobManager.");
}
- gen.writeEndArray();
-
- gen.writeEndObject();
-
- gen.close();
- return writer.toString();
}
catch (Exception e) {
throw new RuntimeException("Failed to fetch list of all running jobs: " + e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/flink/blob/506ce61f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index 6444e4b..d9bfcb7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -36,8 +37,8 @@ import java.util.Map;
* Request handler that returns a summary of the job status.
*/
public class CurrentJobsOverviewHandler implements RequestHandler, RequestHandler.JsonResponse {
-
- private final ActorGateway jobManager;
+
+ private final JobManagerArchiveRetriever retriever;
private final FiniteDuration timeout;
@@ -45,9 +46,12 @@ public class CurrentJobsOverviewHandler implements RequestHandler, RequestHandle
private final boolean includeFinishedJobs;
- public CurrentJobsOverviewHandler(ActorGateway jobManager, FiniteDuration timeout,
+ public CurrentJobsOverviewHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout,
boolean includeRunningJobs, boolean includeFinishedJobs) {
- this.jobManager = jobManager;
+ if (retriever == null || timeout == null) {
+ throw new NullPointerException();
+ }
+ this.retriever = retriever;
this.timeout = timeout;
this.includeRunningJobs = includeRunningJobs;
this.includeFinishedJobs = includeFinishedJobs;
@@ -56,42 +60,50 @@ public class CurrentJobsOverviewHandler implements RequestHandler, RequestHandle
@Override
public String handleRequest(Map<String, String> params) throws Exception {
try {
- Future<Object> future = jobManager.ask(
- new RequestJobDetails(includeRunningJobs, includeFinishedJobs), timeout);
-
- MultipleJobsDetails result = (MultipleJobsDetails) Await.result(future, timeout);
-
- final long now = System.currentTimeMillis();
-
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
- gen.writeStartObject();
-
-
- if (includeRunningJobs && includeFinishedJobs) {
- gen.writeArrayFieldStart("running");
- for (JobDetails detail : result.getRunningJobs()) {
- generateSingleJobDetails(detail, gen, now);
+ ActorGateway jobManager = retriever.getJobManagerGateway();
+
+ if (jobManager != null) {
+
+ Future<Object> future = jobManager.ask(
+ new RequestJobDetails(includeRunningJobs, includeFinishedJobs), timeout);
+
+ MultipleJobsDetails result = (MultipleJobsDetails) Await.result(future, timeout);
+
+ final long now = System.currentTimeMillis();
+
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+ gen.writeStartObject();
+
+
+ if (includeRunningJobs && includeFinishedJobs) {
+ gen.writeArrayFieldStart("running");
+ for (JobDetails detail : result.getRunningJobs()) {
+ generateSingleJobDetails(detail, gen, now);
+ }
+ gen.writeEndArray();
+
+ gen.writeArrayFieldStart("finished");
+ for (JobDetails detail : result.getFinishedJobs()) {
+ generateSingleJobDetails(detail, gen, now);
+ }
+ gen.writeEndArray();
}
- gen.writeEndArray();
-
- gen.writeArrayFieldStart("finished");
- for (JobDetails detail : result.getFinishedJobs()) {
- generateSingleJobDetails(detail, gen, now);
+ else {
+ gen.writeArrayFieldStart("jobs");
+ for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
+ generateSingleJobDetails(detail, gen, now);
+ }
+ gen.writeEndArray();
}
- gen.writeEndArray();
+
+ gen.writeEndObject();
+ gen.close();
+ return writer.toString();
}
else {
- gen.writeArrayFieldStart("jobs");
- for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
- generateSingleJobDetails(detail, gen, now);
- }
- gen.writeEndArray();
+ throw new Exception("No connection to the leading JobManager.");
}
-
- gen.writeEndObject();
- gen.close();
- return writer.toString();
}
catch (Exception e) {
throw new Exception("Failed to fetch the status overview: " + e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/flink/blob/506ce61f/flink-runtime/src/test/java/org/apache/flink/runtime/messages/GenericMessageTester.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/GenericMessageTester.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/GenericMessageTester.java
index dcf4839..76c1bd0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/GenericMessageTester.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/GenericMessageTester.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.messages;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
import java.io.Serializable;
import java.lang.reflect.Constructor;