You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/21 21:10:51 UTC
[12/26] flink git commit: [FLINK-2358] [dashboard] First part
dashboard server backend
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 8d79c45..a414cf6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.jobmanager.web;
import java.io.File;
@@ -25,22 +24,21 @@ import java.io.IOException;
import java.net.URL;
import akka.actor.ActorRef;
+
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.eclipse.jetty.server.Connector;
-import org.eclipse.jetty.server.handler.ResourceHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.eclipse.jetty.http.security.Constraint;
-import org.eclipse.jetty.security.ConstraintMapping;
-import org.eclipse.jetty.security.ConstraintSecurityHandler;
-import org.eclipse.jetty.security.HashLoginService;
-import org.eclipse.jetty.security.authentication.BasicAuthenticator;
+
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.handler.ResourceHandler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import scala.concurrent.duration.FiniteDuration;
/**
@@ -49,25 +47,17 @@ import scala.concurrent.duration.FiniteDuration;
*/
public class WebInfoServer {
- /**
- * Web root dir in the jar
- */
+ /** Web root dir in the jar */
private static final String WEB_ROOT_DIR = "web-docs-infoserver";
- /**
- * The log for this class.
- */
+ /** The log for this class. */
private static final Logger LOG = LoggerFactory.getLogger(WebInfoServer.class);
- /**
- * The jetty server serving all requests.
- */
+ /** The jetty server serving all requests. */
private final Server server;
- /**
- * The assigned port where jetty is running.
- */
- private int assignedPort;
+ /** The assigned port where jetty is running. */
+ private int assignedPort = -1;
/**
* Creates a new web info server. The server runs the servlets that implement the logic
@@ -126,11 +116,9 @@ public class WebInfoServer {
// ----- the handlers for the servlets -----
ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
servletContext.setContextPath("/");
- servletContext.addServlet(new ServletHolder(new JobManagerInfoServlet(jobmanager,
- archive, timeout)), "/jobsInfo");
+ servletContext.addServlet(new ServletHolder(new JobManagerInfoServlet(jobmanager, archive, timeout)), "/jobsInfo");
servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(logDirFiles)), "/logInfo");
- servletContext.addServlet(new ServletHolder(new SetupInfoServlet(config, jobmanager, timeout)),
- "/setupInfo");
+ servletContext.addServlet(new ServletHolder(new SetupInfoServlet(config, jobmanager, timeout)), "/setupInfo");
servletContext.addServlet(new ServletHolder(new MenuServlet()), "/menu");
@@ -141,47 +129,9 @@ public class WebInfoServer {
// ----- add the handlers to the list handler -----
HandlerList handlers = new HandlerList();
- handlers.addHandler(servletContext);
handlers.addHandler(resourceHandler);
-
- // ----- create the login module with http authentication -----
-
- File af = null;
- String authFile = config.getString(ConfigConstants.JOB_MANAGER_WEB_ACCESS_FILE_KEY, null);
- if (authFile != null) {
- af = new File(authFile);
- if (!af.exists()) {
- LOG.error("The specified file '" + af.getAbsolutePath()
- + "' with the authentication information is missing. Starting server without HTTP authentication.");
- af = null;
- }
- }
- if (af != null) {
- HashLoginService loginService = new HashLoginService("Flink Jobmanager Interface", authFile);
- server.addBean(loginService);
-
- Constraint constraint = new Constraint();
- constraint.setName(Constraint.__BASIC_AUTH);
- constraint.setAuthenticate(true);
- constraint.setRoles(new String[] { "user" });
-
- ConstraintMapping mapping = new ConstraintMapping();
- mapping.setPathSpec("/*");
- mapping.setConstraint(constraint);
-
- ConstraintSecurityHandler sh = new ConstraintSecurityHandler();
- sh.addConstraintMapping(mapping);
- sh.setAuthenticator(new BasicAuthenticator());
- sh.setLoginService(loginService);
- sh.setStrict(true);
-
- // set the handers: the server hands the request to the security handler,
- // which hands the request to the other handlers when authenticated
- sh.setHandler(handlers);
- server.setHandler(sh);
- } else {
- server.setHandler(handlers);
- }
+ handlers.addHandler(servletContext);
+ server.setHandler(handlers);
}
/**
@@ -192,13 +142,22 @@ public class WebInfoServer {
*/
public void start() throws Exception {
server.start();
- final Connector connector = server.getConnectors()[0];
- assignedPort = connector.getLocalPort(); // we have to use getLocalPort() instead of getPort() http://stackoverflow.com/questions/8884865/how-to-discover-jetty-7-running-port
- String host = connector.getHost();
- if(host == null) { // as per method documentation
- host = "0.0.0.0";
+
+ final Connector[] connectors = server.getConnectors();
+ if (connectors != null && connectors.length > 0) {
+ Connector conn = connectors[0];
+
+ // we have to use getLocalPort() instead of getPort() http://stackoverflow.com/questions/8884865/how-to-discover-jetty-7-running-port
+ this.assignedPort = conn.getLocalPort();
+ String host = conn.getHost();
+ if (host == null) { // as per method documentation
+ host = "0.0.0.0";
+ }
+ LOG.info("Started web info server for JobManager on {}:{}", host, assignedPort);
+ }
+ else {
+ LOG.warn("Unable to determine local endpoint of web frontend server");
}
- LOG.info("Started web info server for JobManager on {}:{}", host, assignedPort);
}
/**
@@ -206,7 +165,7 @@ public class WebInfoServer {
*/
public void stop() throws Exception {
server.stop();
- assignedPort = 0;
+ assignedPort = -1;
}
public int getServerPort() {
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/InfoMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/InfoMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/InfoMessage.java
new file mode 100644
index 0000000..a3013bb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/InfoMessage.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+/**
+ * Abstract base interface of all info messages exchanged between the
+ * JobManager an for example the runtime monitor.
+ */
+public interface InfoMessage extends java.io.Serializable {}
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
new file mode 100644
index 0000000..084e97d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+/**
+ * An overview of how many jobs are in which status.
+ */
+public class JobsOverview implements InfoMessage {
+
+ private static final long serialVersionUID = -3699051943490133183L;
+
+ private final int numJobsRunningOrPending;
+ private final int numJobsFinished;
+ private final int numJobsCancelled;
+ private final int numJobsFailed;
+
+ public JobsOverview(int numJobsRunningOrPending, int numJobsFinished,
+ int numJobsCancelled, int numJobsFailed) {
+
+ this.numJobsRunningOrPending = numJobsRunningOrPending;
+ this.numJobsFinished = numJobsFinished;
+ this.numJobsCancelled = numJobsCancelled;
+ this.numJobsFailed = numJobsFailed;
+ }
+
+ public JobsOverview(JobsOverview first, JobsOverview second) {
+ this.numJobsRunningOrPending = first.numJobsRunningOrPending + second.numJobsRunningOrPending;
+ this.numJobsFinished = first.numJobsFinished + second.numJobsFinished;
+ this.numJobsCancelled = first.numJobsCancelled + second.numJobsCancelled;
+ this.numJobsFailed = first.numJobsFailed + second.numJobsFailed;
+ }
+
+ public int getNumJobsRunningOrPending() {
+ return numJobsRunningOrPending;
+ }
+
+ public int getNumJobsFinished() {
+ return numJobsFinished;
+ }
+
+ public int getNumJobsCancelled() {
+ return numJobsCancelled;
+ }
+
+ public int getNumJobsFailed() {
+ return numJobsFailed;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ else if (obj instanceof JobsOverview) {
+ JobsOverview that = (JobsOverview) obj;
+ return this.numJobsRunningOrPending == that.numJobsRunningOrPending &&
+ this.numJobsFinished == that.numJobsFinished &&
+ this.numJobsCancelled == that.numJobsCancelled &&
+ this.numJobsFailed == that.numJobsFailed;
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ int result = numJobsRunningOrPending;
+ result = 31 * result + numJobsFinished;
+ result = 31 * result + numJobsCancelled;
+ result = 31 * result + numJobsFailed;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "JobsOverview {" +
+ "numJobsRunningOrPending=" + numJobsRunningOrPending +
+ ", numJobsFinished=" + numJobsFinished +
+ ", numJobsCancelled=" + numJobsCancelled +
+ ", numJobsFailed=" + numJobsFailed +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
new file mode 100644
index 0000000..a261c2a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An overview of how many jobs are in which status.
+ */
+public class JobsWithIDsOverview implements InfoMessage {
+
+ private static final long serialVersionUID = -3699051943490133183L;
+
+ private final List<JobID> jobsRunningOrPending;
+ private final List<JobID> jobsFinished;
+ private final List<JobID> jobsCancelled;
+ private final List<JobID> jobsFailed;
+
+ public JobsWithIDsOverview(List<JobID> jobsRunningOrPending, List<JobID> jobsFinished,
+ List<JobID> jobsCancelled, List<JobID> jobsFailed) {
+
+ this.jobsRunningOrPending = jobsRunningOrPending;
+ this.jobsFinished = jobsFinished;
+ this.jobsCancelled = jobsCancelled;
+ this.jobsFailed = jobsFailed;
+ }
+
+ public JobsWithIDsOverview(JobsWithIDsOverview first, JobsWithIDsOverview second) {
+ this.jobsRunningOrPending = combine(first.getJobsRunningOrPending(), second.getJobsRunningOrPending());
+ this.jobsFinished = combine(first.getJobsFinished(), second.getJobsFinished());
+ this.jobsCancelled = combine(first.getJobsCancelled(), second.getJobsCancelled());
+ this.jobsFailed = combine(first.getJobsFailed(), second.getJobsFailed());
+ }
+
+ public List<JobID> getJobsRunningOrPending() {
+ return jobsRunningOrPending;
+ }
+
+ public List<JobID> getJobsFinished() {
+ return jobsFinished;
+ }
+
+ public List<JobID> getJobsCancelled() {
+ return jobsCancelled;
+ }
+
+ public List<JobID> getJobsFailed() {
+ return jobsFailed;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "JobsOverview {" +
+ "numJobsRunningOrPending=" + jobsRunningOrPending +
+ ", numJobsFinished=" + jobsFinished +
+ ", numJobsCancelled=" + jobsCancelled +
+ ", numJobsFailed=" + jobsFailed +
+ '}';
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static ArrayList<JobID> combine(List<JobID> first, List<JobID> second) {
+ ArrayList<JobID> result = new ArrayList<JobID>(first.size() + second.size());
+ result.addAll(first);
+ result.addAll(second);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobsOverview.java
new file mode 100644
index 0000000..651bfe3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobsOverview.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+/**
+ * This message requests an overview of how many jobs are in which status.
+ * The response to this message is a {@link JobsOverview} message.
+ */
+public class RequestJobsOverview implements InfoMessage {
+
+ private static final long serialVersionUID = 3052933564788843275L;
+
+ // ------------------------------------------------------------------------
+
+ private static final RequestJobsOverview INSTANCE = new RequestJobsOverview();
+
+ public static RequestJobsOverview getInstance() {
+ return INSTANCE;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return RequestJobsOverview.class.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj != null && obj.getClass() == RequestJobsOverview.class;
+ }
+
+ @Override
+ public String toString() {
+ return RequestJobsOverview.class.getSimpleName();
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * No external instantiation
+ */
+ private RequestJobsOverview() {}
+
+ /**
+ * Preserve the singleton property by returning the singleton instance
+ */
+ private Object readResolve() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobsWithIDsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobsWithIDsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobsWithIDsOverview.java
new file mode 100644
index 0000000..352dfb5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobsWithIDsOverview.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+/**
+ * This message requests an overview of how many jobs are in which status.
+ * The response to this message is a {@link org.apache.flink.runtime.messages.webmonitor.JobsOverview} message.
+ */
+public class RequestJobsWithIDsOverview implements InfoMessage {
+
+ private static final long serialVersionUID = 3052933564788843275L;
+
+ // ------------------------------------------------------------------------
+
+ private static final RequestJobsWithIDsOverview INSTANCE = new RequestJobsWithIDsOverview();
+
+ public static RequestJobsWithIDsOverview getInstance() {
+ return INSTANCE;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return RequestJobsWithIDsOverview.class.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj != null && obj.getClass() == RequestJobsWithIDsOverview.class;
+ }
+
+ @Override
+ public String toString() {
+ return RequestJobsWithIDsOverview.class.getSimpleName();
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * No external instantiation
+ */
+ private RequestJobsWithIDsOverview() {}
+
+ /**
+ * Preserve the singleton property by returning the singleton instance
+ */
+ private Object readResolve() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusOverview.java
new file mode 100644
index 0000000..c3797fd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusOverview.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+/**
+ * This message requests an overview of the status, such as how many TaskManagers
+ * are currently connected, how many slots are available, how many are free, ...
+ * The response to this message is a {@link StatusOverview} message.
+ */
+public class RequestStatusOverview implements InfoMessage {
+
+ private static final long serialVersionUID = 3052933564788843275L;
+
+ // ------------------------------------------------------------------------
+
+ private static final RequestStatusOverview INSTANCE = new RequestStatusOverview();
+
+ public static RequestStatusOverview getInstance() {
+ return INSTANCE;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return RequestStatusOverview.class.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj != null && obj.getClass() == RequestStatusOverview.class;
+ }
+
+ @Override
+ public String toString() {
+ return RequestStatusOverview.class.getSimpleName();
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * No external instantiation
+ */
+ private RequestStatusOverview() {}
+
+ /**
+ * Preserve the singleton property by returning the singleton instance
+ */
+ private Object readResolve() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusWithJobIDsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusWithJobIDsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusWithJobIDsOverview.java
new file mode 100644
index 0000000..d26fe3d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusWithJobIDsOverview.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+/**
+ * This message requests an overview of the status, such as how many TaskManagers
+ * are currently connected, how many slots are available, how many are free, ...
+ * The response to this message is a {@link org.apache.flink.runtime.messages.webmonitor.StatusOverview} message.
+ */
+public class RequestStatusWithJobIDsOverview implements InfoMessage {
+
+ private static final long serialVersionUID = 3052933564788843275L;
+
+ // ------------------------------------------------------------------------
+
+ private static final RequestStatusWithJobIDsOverview INSTANCE = new RequestStatusWithJobIDsOverview();
+
+ public static RequestStatusWithJobIDsOverview getInstance() {
+ return INSTANCE;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return RequestStatusWithJobIDsOverview.class.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj != null && obj.getClass() == RequestStatusWithJobIDsOverview.class;
+ }
+
+ @Override
+ public String toString() {
+ return RequestStatusWithJobIDsOverview.class.getSimpleName();
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * No external instantiation
+ */
+ private RequestStatusWithJobIDsOverview() {}
+
+ /**
+ * Preserve the singleton property by returning the singleton instance
+ */
+ private Object readResolve() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
new file mode 100644
index 0000000..214141e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+/**
+ * Response to the {@link RequestStatusOverview} message, carrying a description
+ * of the Flink cluster status.
+ */
+public class StatusOverview extends JobsOverview {
+
+ private static final long serialVersionUID = -729861859715105265L;
+
+ private final int numTaskManagersConnected;
+ private final int numSlotsTotal;
+ private final int numSlotsAvailable;
+
+ public StatusOverview(int numTaskManagersConnected, int numSlotsTotal, int numSlotsAvailable,
+ int numJobsRunningOrPending, int numJobsFinished, int numJobsCancelled, int numJobsFailed) {
+
+ super(numJobsRunningOrPending, numJobsFinished, numJobsCancelled, numJobsFailed);
+
+ this.numTaskManagersConnected = numTaskManagersConnected;
+ this.numSlotsTotal = numSlotsTotal;
+ this.numSlotsAvailable = numSlotsAvailable;
+ }
+
+ public StatusOverview(int numTaskManagersConnected, int numSlotsTotal, int numSlotsAvailable,
+ JobsOverview jobs1, JobsOverview jobs2) {
+ super(jobs1, jobs2);
+ this.numTaskManagersConnected = numTaskManagersConnected;
+ this.numSlotsTotal = numSlotsTotal;
+ this.numSlotsAvailable = numSlotsAvailable;
+ }
+
+ public int getNumTaskManagersConnected() {
+ return numTaskManagersConnected;
+ }
+
+ public int getNumSlotsTotal() {
+ return numSlotsTotal;
+ }
+
+ public int getNumSlotsAvailable() {
+ return numSlotsAvailable;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ else if (obj instanceof StatusOverview) {
+ StatusOverview that = (StatusOverview) obj;
+ return this.numTaskManagersConnected == that.numTaskManagersConnected &&
+ this.numSlotsTotal == that.numSlotsTotal &&
+ this.numSlotsAvailable == that.numSlotsAvailable &&
+ this.getNumJobsRunningOrPending() == that.getNumJobsRunningOrPending() &&
+ this.getNumJobsFinished() == that.getNumJobsFinished() &&
+ this.getNumJobsCancelled() == that.getNumJobsCancelled() &&
+ this.getNumJobsFailed() == that.getNumJobsFailed();
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + numTaskManagersConnected;
+ result = 31 * result + numSlotsTotal;
+ result = 31 * result + numSlotsAvailable;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "StatusOverview {" +
+ "numTaskManagersConnected=" + numTaskManagersConnected +
+ ", numSlotsTotal=" + numSlotsTotal +
+ ", numSlotsAvailable=" + numSlotsAvailable +
+ ", numJobsRunningOrPending=" + getNumJobsRunningOrPending() +
+ ", numJobsFinished=" + getNumJobsFinished() +
+ ", numJobsCancelled=" + getNumJobsCancelled() +
+ ", numJobsFailed=" + getNumJobsFailed() +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusWithJobIDsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusWithJobIDsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusWithJobIDsOverview.java
new file mode 100644
index 0000000..72fb01b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusWithJobIDsOverview.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.List;
+
+/**
+ * Response to the {@link org.apache.flink.runtime.messages.webmonitor.RequestStatusWithJobIDsOverview}
+ * message, carrying a description of the Flink cluster status.
+ */
+public class StatusWithJobIDsOverview extends JobsWithIDsOverview {
+
+ private static final long serialVersionUID = -729861859715105265L;
+
+ private final int numTaskManagersConnected;
+ private final int numSlotsTotal;
+ private final int numSlotsAvailable;
+
+ public StatusWithJobIDsOverview(int numTaskManagersConnected, int numSlotsTotal, int numSlotsAvailable,
+ List<JobID> jobsRunningOrPending, List<JobID> jobsFinished,
+ List<JobID> jobsCancelled, List<JobID> jobsFailed) {
+
+ super(jobsRunningOrPending, jobsFinished, jobsCancelled, jobsFailed);
+
+ this.numTaskManagersConnected = numTaskManagersConnected;
+ this.numSlotsTotal = numSlotsTotal;
+ this.numSlotsAvailable = numSlotsAvailable;
+ }
+
+ public StatusWithJobIDsOverview(int numTaskManagersConnected, int numSlotsTotal, int numSlotsAvailable,
+ JobsWithIDsOverview jobs1, JobsWithIDsOverview jobs2) {
+ super(jobs1, jobs2);
+ this.numTaskManagersConnected = numTaskManagersConnected;
+ this.numSlotsTotal = numSlotsTotal;
+ this.numSlotsAvailable = numSlotsAvailable;
+ }
+
+ public int getNumTaskManagersConnected() {
+ return numTaskManagersConnected;
+ }
+
+ public int getNumSlotsTotal() {
+ return numSlotsTotal;
+ }
+
+ public int getNumSlotsAvailable() {
+ return numSlotsAvailable;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "StatusOverview {" +
+ "numTaskManagersConnected=" + numTaskManagersConnected +
+ ", numSlotsTotal=" + numSlotsTotal +
+ ", numSlotsAvailable=" + numSlotsAvailable +
+ ", numJobsRunningOrPending=" + getJobsRunningOrPending() +
+ ", numJobsFinished=" + getJobsFinished() +
+ ", numJobsCancelled=" + getJobsCancelled() +
+ ", numJobsFailed=" + getJobsFailed() +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/package-info.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/package-info.java
new file mode 100644
index 0000000..730cf33
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the actor messages that are sent between the
+ * JobManager and components that are interested in the status of
+ * the JobManager. An example for such a component is the web
+ * runtime monitor, which sends messages to request the status.
+ */
+package org.apache.flink.runtime.messages.webmonitor;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitor.java
new file mode 100644
index 0000000..c2e2b0d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+/**
+ * Interface for web monitors. Defines life-cycle methods and properties.
+ */
+public interface WebMonitor {
+
+ /**
+ * Starts the web monitor.
+ *
+ * @throws Exception This method may forward exceptions, if it cannot bring up the web monitor.
+ */
+ void start() throws Exception;
+
+ /**
+ * Stops the web server.
+ *
+ * @throws Exception This method may forward exceptions, if it cannot properly stop the web monitor.
+ */
+ void stop() throws Exception;
+
+ /**
+ * Gets the port that the web server actually binds to. If port 0 was given in
+ * the configuration, then a random free port will be picked. This method can
+ * be used to determine this port.
+ *
+ * @return The port where the web server is listening, or -1, if no server is running.
+ */
+ int getServerPort();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 5cf69ec..d8d51ce 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -19,12 +19,16 @@
package org.apache.flink.runtime.jobmanager
import java.io.{File, IOException}
+import java.lang.reflect.{InvocationTargetException, Constructor}
import java.net.InetSocketAddress
import java.util.Collections
import akka.actor.Status.{Failure, Success}
import akka.actor._
+import _root_.akka.pattern.ask
+
import grizzled.slf4j.Logger
+
import org.apache.flink.api.common.{ExecutionConfig, JobID}
import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
import org.apache.flink.core.io.InputSplitAssigner
@@ -39,12 +43,14 @@ import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
import org.apache.flink.runtime.messages.accumulators._
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint}
+import org.apache.flink.runtime.messages.webmonitor._
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util.ZooKeeperUtil
import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation}
+import org.apache.flink.runtime.webmonitor.WebMonitor
import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -411,9 +417,11 @@ class JobManager(
case message: AccumulatorMessage => handleAccumulatorMessage(message)
+ case message: InfoMessage => handleInfoRequestMessage(message, sender())
+
case RequestStackTrace(instanceID) =>
val gateway = instanceManager.getRegisteredInstanceById(instanceID).getInstanceGateway
- gateway.forward(SendStackTrace, sender)
+ gateway.forward(SendStackTrace, sender())
case Terminated(taskManager) =>
if (instanceManager.isRegistered(taskManager)) {
@@ -502,8 +510,9 @@ class JobManager(
}
executionGraph.setNumberOfRetriesLeft(jobNumberRetries)
executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
- executionGraph.setScheduleMode(jobGraph.getScheduleMode)
- executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
+ executionGraph.setScheduleMode(jobGraph.getScheduleMode())
+ executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling())
+ executionGraph.setJsonPlan(jobGraph.getJsonPlan())
// initialize the vertices that have a master initialization hook
// file output formats create directories here, input formats create splits
@@ -737,6 +746,115 @@ class JobManager(
}
/**
+ * Dedicated handler for monitor info request messages.
+ *
+ * @param actorMessage The info request message.
+ */
+ private def handleInfoRequestMessage(actorMessage: InfoMessage, theSender: ActorRef): Unit = {
+ try {
+ actorMessage match {
+
+ case _ : RequestJobsOverview =>
+ // get our own overview
+ val ourJobs = createJobStatusOverview()
+
+ // get the overview from the archive
+ val future = (archive ? RequestJobsOverview.getInstance())(timeout)
+
+ future.onSuccess {
+ case archiveOverview: JobsOverview =>
+ theSender ! new JobsOverview(ourJobs, archiveOverview)
+ }(context.dispatcher)
+
+ case _ : RequestJobsWithIDsOverview =>
+ // get our own overview
+ val ourJobs = createJobStatusWithIDsOverview()
+
+ // get the overview from the archive
+ val future = (archive ? RequestJobsWithIDsOverview.getInstance())(timeout)
+
+ future.onSuccess {
+ case archiveOverview: JobsWithIDsOverview =>
+ theSender ! new JobsWithIDsOverview(ourJobs, archiveOverview)
+ }(context.dispatcher)
+
+ case _ : RequestStatusOverview =>
+
+ val ourJobs = createJobStatusOverview()
+
+ val numTMs = instanceManager.getNumberOfRegisteredTaskManagers()
+ val numSlotsTotal = instanceManager.getTotalNumberOfSlots()
+ val numSlotsAvailable = instanceManager.getNumberOfAvailableSlots()
+
+ // add to that the jobs from the archive
+ val future = (archive ? RequestJobsOverview.getInstance())(timeout)
+ future.onSuccess {
+ case archiveOverview: JobsOverview =>
+ theSender ! new StatusOverview(numTMs, numSlotsTotal, numSlotsAvailable,
+ ourJobs, archiveOverview)
+ }(context.dispatcher)
+
+ case _ : RequestStatusWithJobIDsOverview =>
+
+ val ourJobs = createJobStatusWithIDsOverview()
+
+ val numTMs = instanceManager.getNumberOfRegisteredTaskManagers()
+ val numSlotsTotal = instanceManager.getTotalNumberOfSlots()
+ val numSlotsAvailable = instanceManager.getNumberOfAvailableSlots()
+
+ // add to that the jobs from the archive
+ val future = (archive ? RequestJobsWithIDsOverview.getInstance())(timeout)
+ future.onSuccess {
+ case archiveOverview: JobsWithIDsOverview =>
+ theSender ! new StatusWithJobIDsOverview(numTMs, numSlotsTotal, numSlotsAvailable,
+ ourJobs, archiveOverview)
+ }(context.dispatcher)
+
+ case _ => throw new Exception("Unrecognized info message " + actorMessage)
+ }
+ }
+ catch {
+ case e: Throwable => log.error(s"Error responding to message $actorMessage", e)
+ }
+ }
+
+ private def createJobStatusOverview() : JobsOverview = {
+ var runningOrPending = 0
+ var finished = 0
+ var canceled = 0
+ var failed = 0
+
+ currentJobs.values.foreach {
+ _._1.getState() match {
+ case JobStatus.FINISHED => finished += 1
+ case JobStatus.CANCELED => canceled += 1
+ case JobStatus.FAILED => failed += 1
+ case _ => runningOrPending += 1
+ }
+ }
+
+ new JobsOverview(runningOrPending, finished, canceled, failed)
+ }
+
+ private def createJobStatusWithIDsOverview() : JobsWithIDsOverview = {
+ val runningOrPending = new java.util.ArrayList[JobID]()
+ val finished = new java.util.ArrayList[JobID]()
+ val canceled = new java.util.ArrayList[JobID]()
+ val failed = new java.util.ArrayList[JobID]()
+
+ currentJobs.values.foreach { case (graph, _) =>
+ graph.getState() match {
+ case JobStatus.FINISHED => finished.add(graph.getJobID)
+ case JobStatus.CANCELED => canceled.add(graph.getJobID)
+ case JobStatus.FAILED => failed.add(graph.getJobID)
+ case _ => runningOrPending.add(graph.getJobID)
+ }
+ }
+
+ new JobsWithIDsOverview(runningOrPending, finished, canceled, failed)
+ }
+
+ /**
* Removes the job and sends it to the MemoryArchivist
* @param jobID ID of the job to remove and archive
*/
@@ -948,7 +1066,17 @@ object JobManager {
}
// start the job manager web frontend
- if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) != -1) {
+ if (configuration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, false)) {
+ LOG.info("Starting NEW JobManger web frontend")
+
+ // start the new web frontend. we need to load this dynamically
+ // because it is not in the same project/dependencies
+ startWebRuntimeMonitor(configuration, jobManager, archiver)
+
+ // for the time being, we need to start both web servers
+ new WebInfoServer(configuration, jobManager, archiver).start()
+ }
+ else if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) != -1) {
LOG.info("Starting JobManger web frontend")
val webServer = new WebInfoServer(configuration, jobManager, archiver)
webServer.start()
@@ -1331,4 +1459,59 @@ object JobManager {
val timeout = AkkaUtils.getLookupTimeout(config)
getJobManagerRemoteReference(address, system, timeout)
}
+
+ // --------------------------------------------------------------------------
+ // Utilities
+ // --------------------------------------------------------------------------
+
+ /**
+ * Starts the web runtime monitor. Because the actual implementation of the
+ * runtime monitor is in another project, we load the runtime monitor dynamically.
+ *
+ * Because failure to start the web runtime monitor is not considered fatal,
+ * this method does not throw any exceptions, but only logs them.
+ *
+ * @param config The configuration for the runtime monitor.
+ * @param jobManager The JobManager actor.
+ * @param archiver The execution graph archive actor.
+ */
+ def startWebRuntimeMonitor(config: Configuration,
+ jobManager: ActorRef,
+ archiver: ActorRef): Unit = {
+ // try to load and instantiate the class
+ val monitor: WebMonitor =
+ try {
+ val classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"
+ val clazz: Class[_ <: WebMonitor] = Class.forName(classname)
+ .asSubclass(classOf[WebMonitor])
+
+ val ctor: Constructor[_ <: WebMonitor] = clazz.getConstructor(classOf[Configuration],
+ classOf[ActorRef],
+ classOf[ActorRef])
+ ctor.newInstance(config, jobManager, archiver)
+ }
+ catch {
+ case e: ClassNotFoundException =>
+ LOG.error("Could not load web runtime monitor. " +
+ "Probably reason: flink-runtime-web is not in the classpath")
+ LOG.debug("Caught exception", e)
+ null
+ case e: InvocationTargetException =>
+ LOG.error("WebServer could not be created", e.getTargetException())
+ null
+ case t: Throwable =>
+ LOG.error("Failed to instantiate web runtime monitor.", t)
+ null
+ }
+
+ if (monitor != null) {
+ try {
+ monitor.start()
+ }
+ catch {
+ case e: Exception =>
+ LOG.error("Failed to start web runtime monitor", e)
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 54d2f2f..6d0b220 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -18,9 +18,13 @@
package org.apache.flink.runtime.jobmanager
+import java.util
+
import akka.actor.Actor
+
import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.messages.webmonitor._
import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.executiongraph.ExecutionGraph
import org.apache.flink.runtime.messages.ArchiveMessages._
@@ -100,6 +104,22 @@ class MemoryArchivist(private val max_entries: Int)
case RequestJobCounts =>
sender ! (finishedCnt, canceledCnt, failedCnt)
+
+ case _ : RequestJobsOverview =>
+ try {
+ sender ! createJobsOverview()
+ }
+ catch {
+ case t: Throwable => log.error("Exception while creating the jobs overview", t)
+ }
+
+ case _ : RequestJobsWithIDsOverview =>
+ try {
+ sender ! createJobsWithIDsOverview()
+ }
+ catch {
+ case t: Throwable => log.error("Exception while creating the jobs overview", t)
+ }
}
/**
@@ -110,6 +130,51 @@ class MemoryArchivist(private val max_entries: Int)
throw new RuntimeException("Received unknown message " + message)
}
+
+ // --------------------------------------------------------------------------
+ // Request Responses
+ // --------------------------------------------------------------------------
+
+ private def createJobsOverview() : JobsOverview = {
+ var runningOrPending = 0
+ var finished = 0
+ var canceled = 0
+ var failed = 0
+
+ graphs.values.foreach {
+ _.getState() match {
+ case JobStatus.FINISHED => finished += 1
+ case JobStatus.CANCELED => canceled += 1
+ case JobStatus.FAILED => failed += 1
+ case _ => runningOrPending += 1
+ }
+ }
+
+ new JobsOverview(runningOrPending, finished, canceled, failed)
+ }
+
+ private def createJobsWithIDsOverview() : JobsWithIDsOverview = {
+ val runningOrPending = new util.ArrayList[JobID]()
+ val finished = new util.ArrayList[JobID]()
+ val canceled = new util.ArrayList[JobID]()
+ val failed = new util.ArrayList[JobID]()
+
+ graphs.values.foreach { graph =>
+ graph.getState() match {
+ case JobStatus.FINISHED => finished.add(graph.getJobID)
+ case JobStatus.CANCELED => canceled.add(graph.getJobID)
+ case JobStatus.FAILED => failed.add(graph.getJobID)
+ case _ => runningOrPending.add(graph.getJobID)
+ }
+ }
+
+ new JobsWithIDsOverview(runningOrPending, finished, canceled, failed)
+ }
+
+ // --------------------------------------------------------------------------
+ // Utilities
+ // --------------------------------------------------------------------------
+
/**
* Remove old ExecutionGraphs belonging to a jobID
* * if more than max_entries are in the queue.
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 424ee4c..2c28956 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -82,6 +82,10 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration,
if (config.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false)) {
val webServer = new WebInfoServer(configuration, jobManager, archiver)
webServer.start()
+
+ if (userConfiguration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, false)) {
+ JobManager.startWebRuntimeMonitor(userConfiguration, jobManager, archiver)
+ }
}
jobManager
}
@@ -170,7 +174,7 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration,
}
def setMemory(config: Configuration): Unit = {
- // set this only if no memory was preconfigured
+ // set this only if no memory was pre-configured
if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) {
val bufferSizeNew: Int = config.getInteger(
@@ -234,12 +238,5 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration,
}
object LocalFlinkMiniCluster {
- val LOG = LoggerFactory.getLogger(classOf[LocalFlinkMiniCluster])
-
- def main(args: Array[String]) {
- var conf = new Configuration;
- conf.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 4)
- conf.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, true)
- var cluster = new LocalFlinkMiniCluster(conf, true)
- }
+// val LOG = LoggerFactory.getLogger(classOf[LocalFlinkMiniCluster])
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml b/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml
index 8cf0d48..649fbbd 100644
--- a/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml
+++ b/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml
@@ -76,6 +76,94 @@ under the License.
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jettison</groupId>
+ <artifactId>jettison</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-grizzly2+</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-grizzly2</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-http</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-framework</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-http-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-rcm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-http-servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.contribs</groupId>
+ <artifactId>jersey-guice</artifactId>
+ </exclusion>
</exclusions>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-shaded-hadoop/flink-shaded-include-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-hadoop/flink-shaded-include-yarn/pom.xml b/flink-shaded-hadoop/flink-shaded-include-yarn/pom.xml
index c3ed2bd..6e2c6b9 100644
--- a/flink-shaded-hadoop/flink-shaded-include-yarn/pom.xml
+++ b/flink-shaded-hadoop/flink-shaded-include-yarn/pom.xml
@@ -81,9 +81,89 @@ under the License.
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jettison</groupId>
+ <artifactId>jettison</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-grizzly2</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-grizzly2</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-http</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-framework</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-http-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-rcm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-http-servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.contribs</groupId>
+ <artifactId>jersey-guice</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -132,6 +212,90 @@ under the License.
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jettison</groupId>
+ <artifactId>jettison</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-grizzly2+</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-grizzly2</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-http</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-framework</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-http-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-rcm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-http-servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.contribs</groupId>
+ <artifactId>jersey-guice</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -188,6 +352,94 @@ under the License.
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jettison</groupId>
+ <artifactId>jettison</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-grizzly2+</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-grizzly2</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-http</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-framework</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-http-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-rcm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-http-servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.contribs</groupId>
+ <artifactId>jersey-guice</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -244,6 +496,94 @@ under the License.
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jettison</groupId>
+ <artifactId>jettison</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-grizzly2+</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-grizzly2</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-http</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-framework</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-http-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-rcm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-http-servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.contribs</groupId>
+ <artifactId>jersey-guice</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -300,6 +640,94 @@ under the License.
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jettison</groupId>
+ <artifactId>jettison</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-grizzly2+</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-grizzly2</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-http</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-framework</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-http-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-rcm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-http-servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.contribs</groupId>
+ <artifactId>jersey-guice</artifactId>
+ </exclusion>
</exclusions>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index e108970..b1134ba 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -216,9 +216,7 @@ public class TestBaseUtils {
}
return readers;
}
-
-
-
+
public static BufferedInputStream[] getResultInputStream(String resultPath) throws IOException {
return getResultInputStream(resultPath, new String[]{});
}
@@ -480,9 +478,9 @@ public class TestBaseUtils {
cienv.putAll(newenv);
} catch (NoSuchFieldException e) {
try {
- Class[] classes = Collections.class.getDeclaredClasses();
+ Class<?>[] classes = Collections.class.getDeclaredClasses();
Map<String, String> env = System.getenv();
- for (Class cl : classes) {
+ for (Class<?> cl : classes) {
if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
Field field = cl.getDeclaredField("m");
field.setAccessible(true);
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 509b86f..79e75d2 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -113,9 +113,15 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration,
val jobManager = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
- if (userConfiguration.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER,false)){
+ if (userConfiguration.getBoolean(
+ ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false))
+ {
val webServer = new WebInfoServer(configuration, jobManager, archive)
webServer.start()
+
+ if (userConfiguration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, false)) {
+ JobManager.startWebRuntimeMonitor(userConfiguration, jobManager, archive)
+ }
}
jobManager
http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1f63ff3..175c008 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,7 @@ under the License.
<module>flink-java</module>
<module>flink-scala</module>
<module>flink-runtime</module>
+ <module>flink-runtime-web</module>
<module>flink-optimizer</module>
<module>flink-examples</module>
<module>flink-clients</module>