You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/08/03 14:59:02 UTC
flink git commit: [FLINK-2409] [webserver] Replaces ActorRefs with
ActorGateways in the web server to automatically decorate messages with a
leader session ID.
Repository: flink
Updated Branches:
refs/heads/master 416ff589e -> fab61a195
[FLINK-2409] [webserver] Replaces ActorRefs with ActorGateways in the web server to automatically decorate messages with a leader session ID.
Refactored MiniCluster to also store a reference to the web server to stop it. Adds support for the new web interface for yarn
Fix web server start condition
This closes #959.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fab61a19
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fab61a19
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fab61a19
Branch: refs/heads/master
Commit: fab61a1954ff1554448e826e1d273689ed520fc3
Parents: 416ff58
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jul 29 18:03:52 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Aug 3 13:47:31 2015 +0200
----------------------------------------------------------------------
.../webmonitor/ExecutionGraphHolder.java | 14 +--
.../runtime/webmonitor/WebRuntimeMonitor.java | 5 +-
.../handlers/RequestJobIdsHandler.java | 14 +--
.../handlers/RequestOverviewHandler.java | 14 +--
.../legacy/JobManagerInfoHandler.java | 55 ++++++-----
.../jobmanager/web/JobManagerInfoServlet.java | 52 +++++------
.../jobmanager/web/SetupInfoServlet.java | 18 ++--
.../runtime/jobmanager/web/WebInfoServer.java | 8 +-
.../flink/runtime/jobmanager/JobManager.scala | 97 +++++++++----------
.../runtime/minicluster/FlinkMiniCluster.scala | 50 +++++++++-
.../minicluster/LocalFlinkMiniCluster.scala | 45 +++++----
.../flink/runtime/taskmanager/TaskManager.scala | 2 +-
.../runtime/testingUtils/TestingCluster.scala | 21 +++--
.../test/util/ForkableFlinkMiniCluster.scala | 41 ++++----
.../apache/flink/yarn/ApplicationMaster.scala | 98 +++++++++++++-------
15 files changed, 293 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 18a548c..a017f3a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -18,12 +18,9 @@
package org.apache.flink.runtime.webmonitor;
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
import scala.concurrent.Await;
@@ -42,18 +39,18 @@ import java.util.WeakHashMap;
*/
public class ExecutionGraphHolder {
- private final ActorRef source;
+ private final ActorGateway source;
private final FiniteDuration timeout;
private final WeakHashMap<JobID, ExecutionGraph> cache = new WeakHashMap<JobID, ExecutionGraph>();
- public ExecutionGraphHolder(ActorRef source) {
+ public ExecutionGraphHolder(ActorGateway source) {
this(source, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
}
- public ExecutionGraphHolder(ActorRef source, FiniteDuration timeout) {
+ public ExecutionGraphHolder(ActorGateway source, FiniteDuration timeout) {
if (source == null || timeout == null) {
throw new NullPointerException();
}
@@ -69,8 +66,7 @@ public class ExecutionGraphHolder {
}
try {
- Timeout to = new Timeout(timeout);
- Future<Object> future = Patterns.ask(source, new JobManagerMessages.RequestJob(jid), to);
+ Future<Object> future = source.ask(new JobManagerMessages.RequestJob(jid), timeout);
Object result = Await.result(future, timeout);
if (result instanceof JobManagerMessages.JobNotFound) {
return null;
http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index a2095d4..006d18d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.webmonitor;
-import akka.actor.ActorRef;
-
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
@@ -35,6 +33,7 @@ import io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
import org.apache.flink.runtime.webmonitor.handlers.ExecutionPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
@@ -88,7 +87,7 @@ public class WebRuntimeMonitor implements WebMonitor {
private Channel serverChannel;
- public WebRuntimeMonitor(Configuration config, ActorRef jobManager, ActorRef archive) throws IOException {
+ public WebRuntimeMonitor(Configuration config, ActorGateway jobManager, ActorGateway archive) throws IOException {
// figure out where our static contents is
final String configuredWebRoot = config.getString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY, null);
final String flinkRoot = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
index 1f28a01..aa1a39f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
@@ -18,10 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-
+import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
import org.apache.flink.runtime.webmonitor.JsonFactory;
@@ -40,15 +37,15 @@ import java.util.Map;
*/
public class RequestJobIdsHandler implements RequestHandler, RequestHandler.JsonResponse {
- private final ActorRef target;
+ private final ActorGateway target;
private final FiniteDuration timeout;
- public RequestJobIdsHandler(ActorRef target) {
+ public RequestJobIdsHandler(ActorGateway target) {
this(target, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
}
- public RequestJobIdsHandler(ActorRef target, FiniteDuration timeout) {
+ public RequestJobIdsHandler(ActorGateway target, FiniteDuration timeout) {
if (target == null || timeout == null) {
throw new NullPointerException();
}
@@ -60,8 +57,7 @@ public class RequestJobIdsHandler implements RequestHandler, RequestHandler.Json
public String handleRequest(Map<String, String> params) throws Exception {
// we need no parameters, get all requests
try {
- Timeout to = new Timeout(timeout);
- Future<Object> future = Patterns.ask(target, RequestJobsWithIDsOverview.getInstance(), to);
+ Future<Object> future = target.ask(RequestJobsWithIDsOverview.getInstance(), timeout);
JobsWithIDsOverview result = (JobsWithIDsOverview) Await.result(future, timeout);
return JsonFactory.generateJobsOverviewJSON(result);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
index e51a4d1..c2c00c7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
@@ -18,10 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-
+import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.webmonitor.RequestStatusWithJobIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.StatusWithJobIDsOverview;
import org.apache.flink.runtime.webmonitor.JsonFactory;
@@ -39,16 +36,16 @@ import java.util.Map;
*/
public class RequestOverviewHandler implements RequestHandler, RequestHandler.JsonResponse {
- private final ActorRef jobManager;
+ private final ActorGateway jobManager;
private final FiniteDuration timeout;
- public RequestOverviewHandler(ActorRef jobManager) {
+ public RequestOverviewHandler(ActorGateway jobManager) {
this(jobManager, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
}
- public RequestOverviewHandler(ActorRef jobManager, FiniteDuration timeout) {
+ public RequestOverviewHandler(ActorGateway jobManager, FiniteDuration timeout) {
if (jobManager == null || timeout == null) {
throw new NullPointerException();
}
@@ -59,8 +56,7 @@ public class RequestOverviewHandler implements RequestHandler, RequestHandler.J
@Override
public String handleRequest(Map<String, String> params) throws Exception {
try {
- Timeout to = new Timeout(timeout);
- Future<Object> future = Patterns.ask(jobManager, RequestStatusWithJobIDsOverview.getInstance(), to);
+ Future<Object> future = jobManager.ask(RequestStatusWithJobIDsOverview.getInstance(), timeout);
StatusWithJobIDsOverview result = (StatusWithJobIDsOverview) Await.result(future, timeout);
return JsonFactory.generateOverviewWithJobIDsJSON(result);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
index 0a1e08c..9b52736 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
@@ -18,10 +18,6 @@
package org.apache.flink.runtime.webmonitor.legacy;
-import akka.actor.ActorRef;
-
-import akka.pattern.Patterns;
-import akka.util.Timeout;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
@@ -41,6 +37,7 @@ import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -78,12 +75,12 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
private static final Charset ENCODING = Charset.forName("UTF-8");
/** Underlying JobManager */
- private final ActorRef jobmanager;
- private final ActorRef archive;
+ private final ActorGateway jobmanager;
+ private final ActorGateway archive;
private final FiniteDuration timeout;
- public JobManagerInfoHandler(ActorRef jobmanager, ActorRef archive, FiniteDuration timeout) {
+ public JobManagerInfoHandler(ActorGateway jobmanager, ActorGateway archive, FiniteDuration timeout) {
this.jobmanager = jobmanager;
this.archive = archive;
this.timeout = timeout;
@@ -118,8 +115,7 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
@SuppressWarnings("unchecked")
private String handleRequest(Routed routed) throws Exception {
if ("archive".equals(routed.queryParam("get"))) {
- Future<Object> response = Patterns.ask(archive, ArchiveMessages.getRequestArchivedJobs(),
- new Timeout(timeout));
+ Future<Object> response = archive.ask(ArchiveMessages.getRequestArchivedJobs(), timeout);
Object result = Await.result(response, timeout);
@@ -135,8 +131,7 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
}
}
else if ("jobcounts".equals(routed.queryParam("get"))) {
- Future<Object> response = Patterns.ask(archive, ArchiveMessages.getRequestJobCounts(),
- new Timeout(timeout));
+ Future<Object> response = archive.ask(ArchiveMessages.getRequestJobCounts(), timeout);
Object result = Await.result(response, timeout);
@@ -152,8 +147,8 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
else if ("job".equals(routed.queryParam("get"))) {
String jobId = routed.queryParam("job");
- Future<Object> response = Patterns.ask(archive, new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
- new Timeout(timeout));
+ Future<Object> response = archive.ask(new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
+ timeout);
Object result = Await.result(response, timeout);
@@ -182,8 +177,8 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
throw new Exception("Found null groupVertexId");
}
- Future<Object> response = Patterns.ask(archive, new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
- new Timeout(timeout));
+ Future<Object> response = archive.ask(new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
+ timeout);
Object result = Await.result(response, timeout);
@@ -205,9 +200,9 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
}
}
else if ("taskmanagers".equals(routed.queryParam("get"))) {
- Future<Object> response = Patterns.ask(jobmanager,
+ Future<Object> response = jobmanager.ask(
JobManagerMessages.getRequestNumberRegisteredTaskManager(),
- new Timeout(timeout));
+ timeout);
Object result = Await.result(response, timeout);
@@ -219,9 +214,9 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
else {
final int numberOfTaskManagers = (Integer)result;
- final Future<Object> responseRegisteredSlots = Patterns.ask(jobmanager,
+ final Future<Object> responseRegisteredSlots = jobmanager.ask(
JobManagerMessages.getRequestTotalNumberOfSlots(),
- new Timeout(timeout));
+ timeout);
final Object resultRegisteredSlots = Await.result(responseRegisteredSlots,
timeout);
@@ -242,8 +237,8 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
else if ("cancel".equals(routed.queryParam("get"))) {
String jobId = routed.queryParam("job");
- Future<Object> response = Patterns.ask(jobmanager, new JobManagerMessages.CancelJob(JobID.fromHexString(jobId)),
- new Timeout(timeout));
+ Future<Object> response = jobmanager.ask(new JobManagerMessages.CancelJob(JobID.fromHexString(jobId)),
+ timeout);
Await.ready(response, timeout);
return "{}";
@@ -256,8 +251,8 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
return writeJsonForVersion();
}
else{
- Future<Object> response = Patterns.ask(jobmanager, JobManagerMessages.getRequestRunningJobs(),
- new Timeout(timeout));
+ Future<Object> response = jobmanager.ask(JobManagerMessages.getRequestRunningJobs(),
+ timeout);
Object result = Await.result(response, timeout);
@@ -454,8 +449,9 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
}
// write accumulators
- final Future<Object> response = Patterns.ask(jobmanager,
- new RequestAccumulatorResultsStringified(graph.getJobID()), new Timeout(timeout));
+ final Future<Object> response = jobmanager.ask(
+ new RequestAccumulatorResultsStringified(graph.getJobID()),
+ timeout);
Object result;
try {
@@ -549,9 +545,9 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
private String writeJsonUpdatesForJob(JobID jobId) {
- final Future<Object> responseArchivedJobs = Patterns.ask(jobmanager,
+ final Future<Object> responseArchivedJobs = jobmanager.ask(
JobManagerMessages.getRequestRunningJobs(),
- new Timeout(timeout));
+ timeout);
Object resultArchivedJobs;
try{
@@ -591,8 +587,9 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
}
bld.append("],");
- final Future<Object> responseJob = Patterns.ask(jobmanager, new JobManagerMessages.RequestJob(jobId),
- new Timeout(timeout));
+ final Future<Object> responseJob = jobmanager.ask(
+ new JobManagerMessages.RequestJob(jobId),
+ timeout);
Object resultJob;
try{
http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
index 82ab63e..ce57714 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
@@ -32,12 +32,9 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import akka.actor.ActorRef;
-
-import akka.pattern.Patterns;
-import akka.util.Timeout;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.messages.ArchiveMessages.ArchivedJobs;
import org.apache.flink.runtime.messages.ArchiveMessages;
@@ -78,12 +75,12 @@ public class JobManagerInfoServlet extends HttpServlet {
private static final Logger LOG = LoggerFactory.getLogger(JobManagerInfoServlet.class);
/** Underlying JobManager */
- private final ActorRef jobmanager;
- private final ActorRef archive;
+ private final ActorGateway jobmanager;
+ private final ActorGateway archive;
private final FiniteDuration timeout;
- public JobManagerInfoServlet(ActorRef jobmanager, ActorRef archive, FiniteDuration timeout) {
+ public JobManagerInfoServlet(ActorGateway jobmanager, ActorGateway archive, FiniteDuration timeout) {
this.jobmanager = jobmanager;
this.archive = archive;
this.timeout = timeout;
@@ -102,8 +99,7 @@ public class JobManagerInfoServlet extends HttpServlet {
try {
if("archive".equals(req.getParameter("get"))) {
- response = Patterns.ask(archive, ArchiveMessages.getRequestArchivedJobs(),
- new Timeout(timeout));
+ response = archive.ask(ArchiveMessages.getRequestArchivedJobs(), timeout);
result = Await.result(response, timeout);
@@ -119,8 +115,7 @@ public class JobManagerInfoServlet extends HttpServlet {
}
}
else if("jobcounts".equals(req.getParameter("get"))) {
- response = Patterns.ask(archive, ArchiveMessages.getRequestJobCounts(),
- new Timeout(timeout));
+ response = archive.ask(ArchiveMessages.getRequestJobCounts(), timeout);
result = Await.result(response, timeout);
@@ -135,8 +130,7 @@ public class JobManagerInfoServlet extends HttpServlet {
else if("job".equals(req.getParameter("get"))) {
String jobId = req.getParameter("job");
- response = Patterns.ask(archive, new RequestJob(JobID.fromHexString(jobId)),
- new Timeout(timeout));
+ response = archive.ask(new RequestJob(JobID.fromHexString(jobId)), timeout);
result = Await.result(response, timeout);
@@ -163,8 +157,7 @@ public class JobManagerInfoServlet extends HttpServlet {
return;
}
- response = Patterns.ask(archive, new RequestJob(JobID.fromHexString(jobId)),
- new Timeout(timeout));
+ response = archive.ask(new RequestJob(JobID.fromHexString(jobId)), timeout);
result = Await.result(response, timeout);
@@ -186,9 +179,9 @@ public class JobManagerInfoServlet extends HttpServlet {
}
else if("taskmanagers".equals(req.getParameter("get"))) {
- response = Patterns.ask(jobmanager,
+ response = jobmanager.ask(
JobManagerMessages.getRequestNumberRegisteredTaskManager(),
- new Timeout(timeout));
+ timeout);
result = Await.result(response, timeout);
@@ -199,9 +192,9 @@ public class JobManagerInfoServlet extends HttpServlet {
} else {
final int numberOfTaskManagers = (Integer)result;
- final Future<Object> responseRegisteredSlots = Patterns.ask(jobmanager,
+ final Future<Object> responseRegisteredSlots = jobmanager.ask(
JobManagerMessages.getRequestTotalNumberOfSlots(),
- new Timeout(timeout));
+ timeout);
final Object resultRegisteredSlots = Await.result(responseRegisteredSlots,
timeout);
@@ -221,8 +214,9 @@ public class JobManagerInfoServlet extends HttpServlet {
else if("cancel".equals(req.getParameter("get"))) {
String jobId = req.getParameter("job");
- response = Patterns.ask(jobmanager, new CancelJob(JobID.fromHexString(jobId)),
- new Timeout(timeout));
+ response = jobmanager.ask(
+ new CancelJob(JobID.fromHexString(jobId)),
+ timeout);
Await.ready(response, timeout);
}
@@ -233,8 +227,9 @@ public class JobManagerInfoServlet extends HttpServlet {
writeJsonForVersion(resp.getWriter());
}
else{
- response = Patterns.ask(jobmanager, JobManagerMessages.getRequestRunningJobs(),
- new Timeout(timeout));
+ response = jobmanager.ask(
+ JobManagerMessages.getRequestRunningJobs(),
+ timeout);
result = Await.result(response, timeout);
@@ -471,8 +466,8 @@ public class JobManagerInfoServlet extends HttpServlet {
}
// write accumulators
- final Future<Object> response = Patterns.ask(jobmanager,
- new RequestAccumulatorResultsStringified(graph.getJobID()), new Timeout(timeout));
+ final Future<Object> response = jobmanager.ask(
+ new RequestAccumulatorResultsStringified(graph.getJobID()), timeout);
Object result;
try {
@@ -575,9 +570,9 @@ public class JobManagerInfoServlet extends HttpServlet {
private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) {
try {
- final Future<Object> responseArchivedJobs = Patterns.ask(jobmanager,
+ final Future<Object> responseArchivedJobs = jobmanager.ask(
JobManagerMessages.getRequestRunningJobs(),
- new Timeout(timeout));
+ timeout);
Object resultArchivedJobs = null;
@@ -615,8 +610,7 @@ public class JobManagerInfoServlet extends HttpServlet {
wrt.write("],");
- final Future<Object> responseJob = Patterns.ask(jobmanager, new RequestJob(jobId),
- new Timeout(timeout));
+ final Future<Object> responseJob = jobmanager.ask(new RequestJob(jobId), timeout);
Object resultJob = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
index c3df253..1f2bfe0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
@@ -32,10 +32,8 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
@@ -67,13 +65,13 @@ public class SetupInfoServlet extends HttpServlet {
final private Configuration configuration;
- final private ActorRef jobmanager;
+ final private ActorGateway jobmanager;
final private FiniteDuration timeout;
- public SetupInfoServlet(Configuration conf, ActorRef jm, FiniteDuration timeout) {
+ public SetupInfoServlet(Configuration conf, ActorGateway jobManager, FiniteDuration timeout) {
configuration = conf;
- this.jobmanager = jm;
+ this.jobmanager = jobManager;
this.timeout = timeout;
}
@@ -114,9 +112,9 @@ public class SetupInfoServlet extends HttpServlet {
private void writeTaskmanagers(HttpServletResponse resp) throws IOException {
- final Future<Object> response = Patterns.ask(jobmanager,
+ final Future<Object> response = jobmanager.ask(
JobManagerMessages.getRequestRegisteredTaskManagers(),
- new Timeout(timeout));
+ timeout);
Object obj = null;
@@ -183,9 +181,9 @@ public class SetupInfoServlet extends HttpServlet {
StackTrace message = null;
Throwable exception = null;
- final Future<Object> response = Patterns.ask(jobmanager,
+ final Future<Object> response = jobmanager.ask(
new RequestStackTrace(instanceID),
- new Timeout(timeout));
+ timeout);
try {
message = (StackTrace) Await.result(response, timeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index a414cf6..4383b65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -23,12 +23,12 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
-import akka.actor.ActorRef;
-
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.handler.ResourceHandler;
import org.eclipse.jetty.server.Server;
@@ -45,7 +45,7 @@ import scala.concurrent.duration.FiniteDuration;
* This class sets up a web-server that contains a web frontend to display information about running jobs.
* It instantiates and configures an embedded jetty server.
*/
-public class WebInfoServer {
+public class WebInfoServer implements WebMonitor {
/** Web root dir in the jar */
private static final String WEB_ROOT_DIR = "web-docs-infoserver";
@@ -70,7 +70,7 @@ public class WebInfoServer {
* @throws IOException
* Thrown, if the server setup failed for an I/O related reason.
*/
- public WebInfoServer(Configuration config, ActorRef jobmanager, ActorRef archive) throws IOException {
+ public WebInfoServer(Configuration config, ActorGateway jobmanager, ActorGateway archive) throws IOException {
if (config == null) {
throw new IllegalArgumentException("No Configuration has been passed to the web server");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 7bf4447..5c0f468 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1128,17 +1128,27 @@ object JobManager {
"TaskManager_Process_Reaper")
}
- // start the job manager web frontend
- if (configuration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, false)) {
- LOG.info("Starting NEW JobManger web frontend")
-
- // start the new web frontend. we need to load this dynamically
- // because it is not in the same project/dependencies
- startWebRuntimeMonitor(configuration, jobManager, archiver)
- }
- else if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) != -1) {
- LOG.info("Starting JobManger web frontend")
- val webServer = new WebInfoServer(configuration, jobManager, archiver)
+ if(configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
+ val lookupTimeout = AkkaUtils.getLookupTimeout(configuration)
+ val jobManagerGateway = JobManager.getJobManagerGateway(jobManager, lookupTimeout)
+ val archiverGateway = new AkkaActorGateway(archiver, jobManagerGateway.leaderSessionID())
+
+ // start the job manager web frontend
+ val webServer = if (
+ configuration.getBoolean(
+ ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY,
+ false)) {
+
+ LOG.info("Starting NEW JobManger web frontend")
+ // start the new web frontend. we need to load this dynamically
+ // because it is not in the same project/dependencies
+ startWebRuntimeMonitor(configuration, jobManagerGateway, archiverGateway)
+ }
+ else {
+ LOG.info("Starting JobManger web frontend")
+ new WebInfoServer(configuration, jobManagerGateway, archiverGateway)
+ }
+
webServer.start()
}
}
@@ -1570,46 +1580,37 @@ object JobManager {
* this method does not throw any exceptions, but only logs them.
*
* @param config The configuration for the runtime monitor.
- * @param jobManager The JobManager actor.
+ * @param jobManager The JobManager actor gateway.
* @param archiver The execution graph archive actor.
*/
- def startWebRuntimeMonitor(config: Configuration,
- jobManager: ActorRef,
- archiver: ActorRef): Unit = {
+ def startWebRuntimeMonitor(
+ config: Configuration,
+ jobManager: ActorGateway,
+ archiver: ActorGateway)
+ : WebMonitor = {
// try to load and instantiate the class
- val monitor: WebMonitor =
- try {
- val classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"
- val clazz: Class[_ <: WebMonitor] = Class.forName(classname)
- .asSubclass(classOf[WebMonitor])
-
- val ctor: Constructor[_ <: WebMonitor] = clazz.getConstructor(classOf[Configuration],
- classOf[ActorRef],
- classOf[ActorRef])
- ctor.newInstance(config, jobManager, archiver)
- }
- catch {
- case e: ClassNotFoundException =>
- LOG.error("Could not load web runtime monitor. " +
- "Probably reason: flink-runtime-web is not in the classpath")
- LOG.debug("Caught exception", e)
- null
- case e: InvocationTargetException =>
- LOG.error("WebServer could not be created", e.getTargetException())
- null
- case t: Throwable =>
- LOG.error("Failed to instantiate web runtime monitor.", t)
- null
- }
-
- if (monitor != null) {
- try {
- monitor.start()
- }
- catch {
- case e: Exception =>
- LOG.error("Failed to start web runtime monitor", e)
- }
+ try {
+ val classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"
+ val clazz: Class[_ <: WebMonitor] = Class.forName(classname)
+ .asSubclass(classOf[WebMonitor])
+
+ val ctor: Constructor[_ <: WebMonitor] = clazz.getConstructor(classOf[Configuration],
+ classOf[ActorGateway],
+ classOf[ActorGateway])
+ ctor.newInstance(config, jobManager, archiver)
+ }
+ catch {
+ case e: ClassNotFoundException =>
+ LOG.error("Could not load web runtime monitor. " +
+ "Probably reason: flink-runtime-web is not in the classpath")
+ LOG.debug("Caught exception", e)
+ null
+ case e: InvocationTargetException =>
+ LOG.error("WebServer could not be created", e.getTargetException())
+ null
+ case t: Throwable =>
+ LOG.error("Failed to instantiate web runtime monitor.", t)
+ null
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 6f810fc..7c57233 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -30,10 +30,12 @@ import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.client.{JobExecutionException, JobClient,
SerializedJobExecutionResult}
-import org.apache.flink.runtime.instance.ActorGateway
+import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
import org.apache.flink.runtime.jobgraph.JobGraph
import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.jobmanager.web.WebInfoServer
import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import org.apache.flink.runtime.webmonitor.WebMonitor
import org.slf4j.LoggerFactory
import scala.concurrent.duration.FiniteDuration
@@ -74,7 +76,7 @@ abstract class FlinkMiniCluster(
val configuration = generateConfiguration(userConfiguration)
var jobManagerActorSystem = startJobManagerActorSystem()
- var jobManagerActor = startJobManager(jobManagerActorSystem)
+ var (jobManagerActor, webMonitor) = startJobManager(jobManagerActorSystem)
val numTaskManagers = configuration.getInteger(
ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
@@ -99,7 +101,7 @@ abstract class FlinkMiniCluster(
def generateConfiguration(userConfiguration: Configuration): Configuration
- def startJobManager(system: ActorSystem): ActorRef
+ def startJobManager(system: ActorSystem): (ActorRef, Option[WebMonitor])
def startTaskManager(index: Int, system: ActorSystem): ActorRef
@@ -156,6 +158,10 @@ abstract class FlinkMiniCluster(
}
def shutdown(): Unit = {
+ webMonitor foreach {
+ _.stop()
+ }
+
val futures = taskManagerActors map {
gracefulStop(_, timeout)
}
@@ -183,6 +189,44 @@ abstract class FlinkMiniCluster(
}
}
+ def startWebServer(
+ config: Configuration,
+ jobManager: ActorRef,
+ archiver: ActorRef)
+ : Option[WebMonitor] = {
+ if(
+ config.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false) &&
+ config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
+
+ val lookupTimeout = AkkaUtils.getLookupTimeout(config)
+
+ val jobManagerGateway = JobManager.getJobManagerGateway(jobManager, lookupTimeout)
+ val archiverGateway = new AkkaActorGateway(archiver, jobManagerGateway.leaderSessionID())
+
+ // start the job manager web frontend
+ val webServer = if (
+ config.getBoolean(
+ ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY,
+ false)) {
+
+ LOG.info("Starting NEW JobManger web frontend")
+ // start the new web frontend. we need to load this dynamically
+ // because it is not in the same project/dependencies
+ JobManager.startWebRuntimeMonitor(config, jobManagerGateway, archiverGateway)
+ }
+ else {
+ LOG.info("Starting JobManger web frontend")
+ new WebInfoServer(config, jobManagerGateway, archiverGateway)
+ }
+
+ webServer.start()
+
+ Option(webServer)
+ } else {
+ None
+ }
+ }
+
def waitForTaskManagersToBeRegistered(): Unit = {
implicit val executionContext = ExecutionContext.global
http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index c056b63..54c457e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -23,12 +23,15 @@ import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
import org.apache.flink.api.common.io.FileOutputFormat
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.client.JobClient
+import org.apache.flink.runtime.instance.AkkaActorGateway
import org.apache.flink.runtime.io.network.netty.NettyConfig
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.jobmanager.web.WebInfoServer
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.flink.runtime.webmonitor.WebMonitor
import org.slf4j.LoggerFactory
@@ -42,9 +45,10 @@ import org.slf4j.LoggerFactory
* @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same
* [[ActorSystem]], otherwise false
*/
-class LocalFlinkMiniCluster(userConfiguration: Configuration,
- singleActorSystem: Boolean,
- streamingMode: StreamingMode)
+class LocalFlinkMiniCluster(
+ userConfiguration: Configuration,
+ singleActorSystem: Boolean,
+ streamingMode: StreamingMode)
extends FlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
@@ -74,23 +78,14 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration,
config
}
- override def startJobManager(system: ActorSystem): ActorRef = {
+ override def startJobManager(system: ActorSystem): (ActorRef, Option[WebMonitor]) = {
val config = configuration.clone()
val (jobManager, archiver) = JobManager.startJobManagerActors(config, system, streamingMode)
-
- if (config.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false)) {
- if (userConfiguration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, false)) {
- // new web frontend
- JobManager.startWebRuntimeMonitor(userConfiguration, jobManager, archiver)
- }
- else {
- // old web frontend
- val webServer = new WebInfoServer(configuration, jobManager, archiver)
- webServer.start()
- }
- }
- jobManager
+
+ val webMonitorOption = startWebServer(config, jobManager, archiver)
+
+ (jobManager, webMonitorOption)
}
override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
@@ -125,13 +120,15 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration,
None
}
- TaskManager.startTaskManagerComponentsAndActor(config, system,
- hostname, // network interface to bind to
- Some(taskManagerActorName), // actor name
- jobManagerPath, // job manager akka URL
- localExecution, // start network stack?
- streamingMode,
- classOf[TaskManager])
+ TaskManager.startTaskManagerComponentsAndActor(
+ config,
+ system,
+ hostname, // network interface to bind to
+ Some(taskManagerActorName), // actor name
+ jobManagerPath, // job manager akka URL
+ localExecution, // start network stack?
+ streamingMode,
+ classOf[TaskManager])
}
def getJobClientActorSystem: ActorSystem = jobClientActorSystem
http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index f974946..0ec1040 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -171,7 +171,7 @@ class TaskManager(
protected var leaderSessionID: Option[UUID] = None
- private var currentRegistrationSessionID: UUID = UUID.randomUUID()
+ private val currentRegistrationSessionID: UUID = UUID.randomUUID()
// --------------------------------------------------------------------------
// Actor messages and life cycle
http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index ce0ef8d..f5a506d 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
import org.apache.flink.runtime.minicluster.FlinkMiniCluster
import org.apache.flink.runtime.net.NetUtils
import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.webmonitor.WebMonitor
/**
* Testing cluster which starts the [[JobManager]] and [[TaskManager]] actors with testing support
@@ -67,7 +68,7 @@ class TestingCluster(userConfiguration: Configuration,
cfg
}
- override def startJobManager(actorSystem: ActorSystem): ActorRef = {
+ override def startJobManager(actorSystem: ActorSystem): (ActorRef, Option[WebMonitor]) = {
val (executionContext,
instanceManager,
@@ -103,7 +104,7 @@ class TestingCluster(userConfiguration: Configuration,
jobManagerProps
}
- actorSystem.actorOf(dispatcherJobManagerProps, JobManager.JOB_MANAGER_NAME)
+ (actorSystem.actorOf(dispatcherJobManagerProps, JobManager.JOB_MANAGER_NAME), None)
}
override def startTaskManager(index: Int, system: ActorSystem) = {
@@ -116,12 +117,14 @@ class TestingCluster(userConfiguration: Configuration,
None
}
- TaskManager.startTaskManagerComponentsAndActor(configuration, system,
- hostname,
- Some(tmActorName),
- jobManagerPath,
- numTaskManagers == 1,
- streamingMode,
- classOf[TestingTaskManager])
+ TaskManager.startTaskManagerComponentsAndActor(
+ configuration,
+ system,
+ hostname,
+ Some(tmActorName),
+ jobManagerPath,
+ numTaskManagers == 1,
+ streamingMode,
+ classOf[TestingTaskManager])
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index cdf3960..e83c7a6 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -22,12 +22,15 @@ import akka.actor.{Props, ActorRef, ActorSystem}
import akka.pattern.Patterns._
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.instance.AkkaActorGateway
import org.apache.flink.runtime.jobmanager.web.WebInfoServer
import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingJobManager,
TestingMemoryArchivist, TestingTaskManager}
+import org.apache.flink.runtime.webmonitor.WebMonitor
import scala.concurrent.Await
@@ -40,9 +43,10 @@ import scala.concurrent.Await
* @param singleActorSystem true, if all actors (JobManager and TaskManager) shall be run in the
* same [[ActorSystem]], otherwise false.
*/
-class ForkableFlinkMiniCluster(userConfiguration: Configuration,
- singleActorSystem: Boolean,
- streamingMode: StreamingMode)
+class ForkableFlinkMiniCluster(
+ userConfiguration: Configuration,
+ singleActorSystem: Boolean,
+ streamingMode: StreamingMode)
extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
@@ -78,7 +82,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration,
super.generateConfiguration(config)
}
- override def startJobManager(actorSystem: ActorSystem): ActorRef = {
+ override def startJobManager(actorSystem: ActorSystem): (ActorRef, Option[WebMonitor]) = {
val (executionContext,
instanceManager,
@@ -95,7 +99,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration,
archiveCount)
with TestingMemoryArchivist)
- val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
+ val archiver = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
val jobManagerProps = Props(
new JobManager(
@@ -104,7 +108,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration,
instanceManager,
scheduler,
libraryCacheManager,
- archive,
+ archiver,
executionRetries,
delayBetweenRetries,
timeout,
@@ -113,21 +117,9 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration,
val jobManager = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
- if (userConfiguration.getBoolean(
- ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false))
- {
- if (userConfiguration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, false)) {
- // new web frontend
- JobManager.startWebRuntimeMonitor(userConfiguration, jobManager, archive)
- }
- else {
- // old web frontend
- val webServer = new WebInfoServer(configuration, jobManager, archive)
- webServer.start()
- }
- }
+ val webMonitorOption = startWebServer(configuration, jobManager, archiver)
- jobManager
+ (jobManager, webMonitorOption)
}
override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
@@ -163,11 +155,18 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration,
val stopped = gracefulStop(jobManagerActor, TestingUtils.TESTING_DURATION)
Await.result(stopped, TestingUtils.TESTING_DURATION)
+ webMonitor foreach {
+ _.stop()
+ }
+
jobManagerActorSystem.shutdown()
jobManagerActorSystem.awaitTermination()
jobManagerActorSystem = startJobManagerActorSystem()
- jobManagerActor = startJobManager(jobManagerActorSystem)
+ val (newJobManagerActor, newWebMonitor) = startJobManager(jobManagerActorSystem)
+
+ jobManagerActor = newJobManagerActor
+ webMonitor = newWebMonitor
}
def restartTaskManager(index: Int): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/fab61a19/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index c497a90..9e0c976 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -26,9 +26,11 @@ import org.apache.flink.client.CliFrontend
import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants}
import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.instance.AkkaActorGateway
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.jobmanager.web.WebInfoServer
import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.flink.runtime.webmonitor.WebMonitor
import org.apache.flink.yarn.Messages.StartYarnSession
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -68,7 +70,7 @@ object ApplicationMaster {
override def run(): Object = {
var actorSystem: ActorSystem = null
- var webserver: WebInfoServer = null
+ var webserver: WebMonitor = null
try {
val conf = new YarnConfiguration()
@@ -99,25 +101,44 @@ object ApplicationMaster {
val slots = env.get(FlinkYarnClient.ENV_SLOTS).toInt
val dynamicPropertiesEncodedString = env.get(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES)
- val (config: Configuration,
- system: ActorSystem,
- jobManager: ActorRef,
- archiver: ActorRef) = startJobManager(currDir, ownHostname,
- dynamicPropertiesEncodedString,
- streamingMode)
+ val config = createConfiguration(currDir, dynamicPropertiesEncodedString)
+
+ val (
+ system: ActorSystem,
+ jobManager: ActorRef,
+ archiver: ActorRef) = startJobManager(
+ config,
+ ownHostname,
+ streamingMode)
+
actorSystem = system
val extActor = system.asInstanceOf[ExtendedActorSystem]
val jobManagerPort = extActor.provider.getDefaultAddress.port.get
- // start the web info server
if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) != -1) {
+ // start the web info server
+ val lookupTimeout = AkkaUtils.getLookupTimeout(config)
+ val jobManagerGateway = JobManager.getJobManagerGateway(jobManager, lookupTimeout)
+ val archiverGateway = new AkkaActorGateway(
+ archiver,
+ jobManagerGateway.leaderSessionID())
+
LOG.info("Starting Job Manger web frontend.")
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDirs)
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); // set port to 0.
// set JobManager host/port for web interface.
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, ownHostname)
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort)
- webserver = new WebInfoServer(config, jobManager, archiver)
+
+ webserver = if(
+ config.getBoolean(
+ ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY,
+ false)) {
+ JobManager.startWebRuntimeMonitor(config, jobManagerGateway, archiverGateway)
+ } else {
+ new WebInfoServer(config, jobManagerGateway, archiverGateway)
+ }
+
webserver.start()
}
@@ -160,11 +181,17 @@ object ApplicationMaster {
}
- def generateConfigurationFile(fileName: String, currDir: String, ownHostname: String,
- jobManagerPort: Int,
- jobManagerWebPort: Int, logDirs: String, slots: Int,
- taskManagerCount: Int, dynamicPropertiesEncodedString: String)
- : Unit = {
+ def generateConfigurationFile(
+ fileName: String,
+ currDir: String,
+ ownHostname: String,
+ jobManagerPort: Int,
+ jobManagerWebPort: Int,
+ logDirs: String,
+ slots: Int,
+ taskManagerCount: Int,
+ dynamicPropertiesEncodedString: String)
+ : Unit = {
LOG.info("Generate configuration file for application master.")
val output = new PrintWriter(new BufferedWriter(
new FileWriter(fileName))
@@ -208,26 +235,13 @@ object ApplicationMaster {
*
* @return (Configuration, JobManager ActorSystem, JobManager ActorRef, Archiver ActorRef)
*/
- def startJobManager(currDir: String,
- hostname: String,
- dynamicPropertiesEncodedString: String,
- streamingMode: StreamingMode):
- (Configuration, ActorSystem, ActorRef, ActorRef) = {
+ def startJobManager(
+ configuration: Configuration,
+ hostname: String,
+ streamingMode: StreamingMode)
+ : (ActorSystem, ActorRef, ActorRef) = {
LOG.info("Starting JobManager for YARN")
- LOG.info(s"Loading config from: $currDir.")
-
- GlobalConfiguration.loadConfiguration(currDir)
- val configuration = GlobalConfiguration.getConfiguration()
-
- configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir)
-
- // add dynamic properties to JobManager configuration.
- val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
- import scala.collection.JavaConverters._
- for(property <- dynamicProperties.asScala){
- configuration.setString(property.f0, property.f1)
- }
// set port to 0 to let Akka automatically determine the port.
LOG.debug("Starting JobManager actor system")
@@ -265,7 +279,25 @@ object ApplicationMaster {
LOG.debug("Starting JobManager actor")
val jobManager = JobManager.startActor(jobManagerProps, jobManagerSystem)
- (configuration, jobManagerSystem, jobManager, archiver)
+ (jobManagerSystem, jobManager, archiver)
+ }
+
+ def createConfiguration(curDir: String, dynamicPropertiesEncodedString: String): Configuration = {
+ LOG.info(s"Loading config from: $curDir.")
+
+ GlobalConfiguration.loadConfiguration(curDir)
+ val configuration = GlobalConfiguration.getConfiguration()
+
+ configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, curDir)
+
+ // add dynamic properties to JobManager configuration.
+ val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
+ import scala.collection.JavaConverters._
+ for(property <- dynamicProperties.asScala){
+ configuration.setString(property.f0, property.f1)
+ }
+
+ configuration
}