You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/21 21:10:51 UTC

[12/26] flink git commit: [FLINK-2358] [dashboard] First part dashboard server backend

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 8d79c45..a414cf6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.jobmanager.web;
 
 import java.io.File;
@@ -25,22 +24,21 @@ import java.io.IOException;
 import java.net.URL;
 
 import akka.actor.ActorRef;
+
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.eclipse.jetty.server.Connector;
-import org.eclipse.jetty.server.handler.ResourceHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.eclipse.jetty.http.security.Constraint;
-import org.eclipse.jetty.security.ConstraintMapping;
-import org.eclipse.jetty.security.ConstraintSecurityHandler;
-import org.eclipse.jetty.security.HashLoginService;
-import org.eclipse.jetty.security.authentication.BasicAuthenticator;
+
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.handler.ResourceHandler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.HandlerList;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -49,25 +47,17 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class WebInfoServer {
 
-	/**
-	 * Web root dir in the jar
-	 */
+	/** Web root dir in the jar */
 	private static final String WEB_ROOT_DIR = "web-docs-infoserver";
 
-	/**
-	 * The log for this class.
-	 */
+	/** The log for this class. */
 	private static final Logger LOG = LoggerFactory.getLogger(WebInfoServer.class);
 
-	/**
-	 * The jetty server serving all requests.
-	 */
+	/** The jetty server serving all requests. */
 	private final Server server;
 
-	/**
-	 * The assigned port where jetty is running.
-	 */
-	private int assignedPort;
+	/** The assigned port where jetty is running. */
+	private int assignedPort = -1;
 
 	/**
 	 * Creates a new web info server. The server runs the servlets that implement the logic
@@ -126,11 +116,9 @@ public class WebInfoServer {
 		// ----- the handlers for the servlets -----
 		ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
 		servletContext.setContextPath("/");
-		servletContext.addServlet(new ServletHolder(new JobManagerInfoServlet(jobmanager,
-				archive, timeout)), "/jobsInfo");
+		servletContext.addServlet(new ServletHolder(new JobManagerInfoServlet(jobmanager, archive, timeout)), "/jobsInfo");
 		servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(logDirFiles)), "/logInfo");
-		servletContext.addServlet(new ServletHolder(new SetupInfoServlet(config, jobmanager, timeout)),
-				"/setupInfo");
+		servletContext.addServlet(new ServletHolder(new SetupInfoServlet(config, jobmanager, timeout)), "/setupInfo");
 		servletContext.addServlet(new ServletHolder(new MenuServlet()), "/menu");
 
 
@@ -141,47 +129,9 @@ public class WebInfoServer {
 
 		// ----- add the handlers to the list handler -----
 		HandlerList handlers = new HandlerList();
-		handlers.addHandler(servletContext);
 		handlers.addHandler(resourceHandler);
-
-		// ----- create the login module with http authentication -----
-
-		File af = null;
-		String authFile = config.getString(ConfigConstants.JOB_MANAGER_WEB_ACCESS_FILE_KEY, null);
-		if (authFile != null) {
-			af = new File(authFile);
-			if (!af.exists()) {
-				LOG.error("The specified file '" + af.getAbsolutePath()
-					+ "' with the authentication information is missing. Starting server without HTTP authentication.");
-				af = null;
-			}
-		}
-		if (af != null) {
-			HashLoginService loginService = new HashLoginService("Flink Jobmanager Interface", authFile);
-			server.addBean(loginService);
-
-			Constraint constraint = new Constraint();
-			constraint.setName(Constraint.__BASIC_AUTH);
-			constraint.setAuthenticate(true);
-			constraint.setRoles(new String[] { "user" });
-
-			ConstraintMapping mapping = new ConstraintMapping();
-			mapping.setPathSpec("/*");
-			mapping.setConstraint(constraint);
-
-			ConstraintSecurityHandler sh = new ConstraintSecurityHandler();
-			sh.addConstraintMapping(mapping);
-			sh.setAuthenticator(new BasicAuthenticator());
-			sh.setLoginService(loginService);
-			sh.setStrict(true);
-
-			// set the handers: the server hands the request to the security handler,
-			// which hands the request to the other handlers when authenticated
-			sh.setHandler(handlers);
-			server.setHandler(sh);
-		} else {
-			server.setHandler(handlers);
-		}
+		handlers.addHandler(servletContext);
+		server.setHandler(handlers);
 	}
 
 	/**
@@ -192,13 +142,22 @@ public class WebInfoServer {
 	 */
 	public void start() throws Exception {
 		server.start();
-		final Connector connector = server.getConnectors()[0];
-		assignedPort = connector.getLocalPort(); // we have to use getLocalPort() instead of getPort() http://stackoverflow.com/questions/8884865/how-to-discover-jetty-7-running-port
-		String host = connector.getHost();
-		if(host == null) { // as per method documentation
-			host = "0.0.0.0";
+		
+		final Connector[] connectors = server.getConnectors();
+		if (connectors != null && connectors.length > 0) {
+			Connector conn = connectors[0];
+
+			// we have to use getLocalPort() instead of getPort() http://stackoverflow.com/questions/8884865/how-to-discover-jetty-7-running-port
+			this.assignedPort = conn.getLocalPort(); 
+			String host = conn.getHost();
+			if (host == null) { // as per method documentation
+				host = "0.0.0.0";
+			}
+			LOG.info("Started web info server for JobManager on {}:{}", host, assignedPort);
+		}
+		else {
+			LOG.warn("Unable to determine local endpoint of web frontend server");
 		}
-		LOG.info("Started web info server for JobManager on {}:{}", host, assignedPort);
 	}
 
 	/**
@@ -206,7 +165,7 @@ public class WebInfoServer {
 	 */
 	public void stop() throws Exception {
 		server.stop();
-		assignedPort = 0;
+		assignedPort = -1;
 	}
 
 	public int getServerPort() {

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/InfoMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/InfoMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/InfoMessage.java
new file mode 100644
index 0000000..a3013bb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/InfoMessage.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+/**
+ * Abstract base interface of all info messages exchanged between the
+ * JobManager an for example the runtime monitor.
+ */
+public interface InfoMessage extends java.io.Serializable {}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
new file mode 100644
index 0000000..084e97d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+/**
+ * An overview of how many jobs are in which status.
+ */
+public class JobsOverview implements InfoMessage {
+
+	private static final long serialVersionUID = -3699051943490133183L;
+	
+	private final int numJobsRunningOrPending;
+	private final int numJobsFinished;
+	private final int numJobsCancelled;
+	private final int numJobsFailed;
+
+	public JobsOverview(int numJobsRunningOrPending, int numJobsFinished,
+						int numJobsCancelled, int numJobsFailed) {
+		
+		this.numJobsRunningOrPending = numJobsRunningOrPending;
+		this.numJobsFinished = numJobsFinished;
+		this.numJobsCancelled = numJobsCancelled;
+		this.numJobsFailed = numJobsFailed;
+	}
+
+	public JobsOverview(JobsOverview first, JobsOverview second) {
+		this.numJobsRunningOrPending = first.numJobsRunningOrPending + second.numJobsRunningOrPending;
+		this.numJobsFinished = first.numJobsFinished + second.numJobsFinished;
+		this.numJobsCancelled = first.numJobsCancelled + second.numJobsCancelled;
+		this.numJobsFailed = first.numJobsFailed + second.numJobsFailed;
+	}
+
+	public int getNumJobsRunningOrPending() {
+		return numJobsRunningOrPending;
+	}
+
+	public int getNumJobsFinished() {
+		return numJobsFinished;
+	}
+
+	public int getNumJobsCancelled() {
+		return numJobsCancelled;
+	}
+
+	public int getNumJobsFailed() {
+		return numJobsFailed;
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (this == obj) {
+			return true;
+		}
+		else if (obj instanceof JobsOverview) {
+			JobsOverview that = (JobsOverview) obj;
+			return this.numJobsRunningOrPending == that.numJobsRunningOrPending &&
+					this.numJobsFinished == that.numJobsFinished &&
+					this.numJobsCancelled == that.numJobsCancelled &&
+					this.numJobsFailed == that.numJobsFailed;
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		int result = numJobsRunningOrPending;
+		result = 31 * result + numJobsFinished;
+		result = 31 * result + numJobsCancelled;
+		result = 31 * result + numJobsFailed;
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "JobsOverview {" +
+				"numJobsRunningOrPending=" + numJobsRunningOrPending +
+				", numJobsFinished=" + numJobsFinished +
+				", numJobsCancelled=" + numJobsCancelled +
+				", numJobsFailed=" + numJobsFailed +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
new file mode 100644
index 0000000..a261c2a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An overview of how many jobs are in which status.
+ */
+public class JobsWithIDsOverview implements InfoMessage {
+
+	private static final long serialVersionUID = -3699051943490133183L;
+	
+	private final List<JobID> jobsRunningOrPending;
+	private final List<JobID> jobsFinished;
+	private final List<JobID> jobsCancelled;
+	private final List<JobID> jobsFailed;
+
+	public JobsWithIDsOverview(List<JobID> jobsRunningOrPending, List<JobID> jobsFinished, 
+								List<JobID> jobsCancelled, List<JobID> jobsFailed) {
+		
+		this.jobsRunningOrPending = jobsRunningOrPending;
+		this.jobsFinished = jobsFinished;
+		this.jobsCancelled = jobsCancelled;
+		this.jobsFailed = jobsFailed;
+	}
+
+	public JobsWithIDsOverview(JobsWithIDsOverview first, JobsWithIDsOverview second) {
+		this.jobsRunningOrPending = combine(first.getJobsRunningOrPending(), second.getJobsRunningOrPending());
+		this.jobsFinished = combine(first.getJobsFinished(), second.getJobsFinished());
+		this.jobsCancelled = combine(first.getJobsCancelled(), second.getJobsCancelled());
+		this.jobsFailed = combine(first.getJobsFailed(), second.getJobsFailed());
+	}
+
+	public List<JobID> getJobsRunningOrPending() {
+		return jobsRunningOrPending;
+	}
+
+	public List<JobID> getJobsFinished() {
+		return jobsFinished;
+	}
+
+	public List<JobID> getJobsCancelled() {
+		return jobsCancelled;
+	}
+
+	public List<JobID> getJobsFailed() {
+		return jobsFailed;
+	}
+	
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "JobsOverview {" +
+				"numJobsRunningOrPending=" + jobsRunningOrPending +
+				", numJobsFinished=" + jobsFinished +
+				", numJobsCancelled=" + jobsCancelled +
+				", numJobsFailed=" + jobsFailed +
+				'}';
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static ArrayList<JobID> combine(List<JobID> first, List<JobID> second) {
+		ArrayList<JobID> result = new ArrayList<JobID>(first.size() + second.size());
+		result.addAll(first);
+		result.addAll(second);
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobsOverview.java
new file mode 100644
index 0000000..651bfe3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobsOverview.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+/**
+ * This message requests an overview of how many jobs are in which status.
+ * The response to this message is a {@link JobsOverview} message.
+ */
+public class RequestJobsOverview implements InfoMessage {
+
+	private static final long serialVersionUID = 3052933564788843275L;
+	
+	// ------------------------------------------------------------------------
+	
+	private static final RequestJobsOverview INSTANCE = new RequestJobsOverview();
+
+	public static RequestJobsOverview getInstance() {
+		return INSTANCE;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return RequestJobsOverview.class.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj != null && obj.getClass() == RequestJobsOverview.class;
+	}
+
+	@Override
+	public String toString() {
+		return RequestJobsOverview.class.getSimpleName();
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * No external instantiation
+	 */
+	private RequestJobsOverview() {}
+
+	/**
+	 * Preserve the singleton property by returning the singleton instance
+	 */
+	private Object readResolve() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobsWithIDsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobsWithIDsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobsWithIDsOverview.java
new file mode 100644
index 0000000..352dfb5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobsWithIDsOverview.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+/**
+ * This message requests an overview of how many jobs are in which status.
+ * The response to this message is a {@link org.apache.flink.runtime.messages.webmonitor.JobsOverview} message.
+ */
+public class RequestJobsWithIDsOverview implements InfoMessage {
+
+	private static final long serialVersionUID = 3052933564788843275L;
+	
+	// ------------------------------------------------------------------------
+	
+	private static final RequestJobsWithIDsOverview INSTANCE = new RequestJobsWithIDsOverview();
+
+	public static RequestJobsWithIDsOverview getInstance() {
+		return INSTANCE;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return RequestJobsWithIDsOverview.class.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj != null && obj.getClass() == RequestJobsWithIDsOverview.class;
+	}
+
+	@Override
+	public String toString() {
+		return RequestJobsWithIDsOverview.class.getSimpleName();
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * No external instantiation
+	 */
+	private RequestJobsWithIDsOverview() {}
+
+	/**
+	 * Preserve the singleton property by returning the singleton instance
+	 */
+	private Object readResolve() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusOverview.java
new file mode 100644
index 0000000..c3797fd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusOverview.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+/**
+ * This message requests an overview of the status, such as how many TaskManagers
+ * are currently connected, how many slots are available, how many are free, ...
+ * The response to this message is a {@link StatusOverview} message.
+ */
+public class RequestStatusOverview implements InfoMessage {
+
+	private static final long serialVersionUID = 3052933564788843275L;
+	
+	// ------------------------------------------------------------------------
+	
+	private static final RequestStatusOverview INSTANCE = new RequestStatusOverview();
+
+	public static RequestStatusOverview getInstance() {
+		return INSTANCE;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return RequestStatusOverview.class.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj != null && obj.getClass() == RequestStatusOverview.class;
+	}
+
+	@Override
+	public String toString() {
+		return RequestStatusOverview.class.getSimpleName();
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * No external instantiation
+	 */
+	private RequestStatusOverview() {}
+
+	/**
+	 * Preserve the singleton property by returning the singleton instance
+	 */
+	private Object readResolve() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusWithJobIDsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusWithJobIDsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusWithJobIDsOverview.java
new file mode 100644
index 0000000..d26fe3d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusWithJobIDsOverview.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+/**
+ * This message requests an overview of the status, such as how many TaskManagers
+ * are currently connected, how many slots are available, how many are free, ...
+ * The response to this message is a {@link org.apache.flink.runtime.messages.webmonitor.StatusOverview} message.
+ */
+public class RequestStatusWithJobIDsOverview implements InfoMessage {
+
+	private static final long serialVersionUID = 3052933564788843275L;
+	
+	// ------------------------------------------------------------------------
+	
+	private static final RequestStatusWithJobIDsOverview INSTANCE = new RequestStatusWithJobIDsOverview();
+
+	public static RequestStatusWithJobIDsOverview getInstance() {
+		return INSTANCE;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return RequestStatusWithJobIDsOverview.class.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj != null && obj.getClass() == RequestStatusWithJobIDsOverview.class;
+	}
+
+	@Override
+	public String toString() {
+		return RequestStatusWithJobIDsOverview.class.getSimpleName();
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * No external instantiation
+	 */
+	private RequestStatusWithJobIDsOverview() {}
+
+	/**
+	 * Preserve the singleton property by returning the singleton instance
+	 */
+	private Object readResolve() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
new file mode 100644
index 0000000..214141e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+/**
+ * Response to the {@link RequestStatusOverview} message, carrying a description
+ * of the Flink cluster status.
+ */
+public class StatusOverview extends JobsOverview {
+
+	private static final long serialVersionUID = -729861859715105265L;
+	
+	private final int numTaskManagersConnected;
+	private final int numSlotsTotal;
+	private final int numSlotsAvailable;
+
+	public StatusOverview(int numTaskManagersConnected, int numSlotsTotal, int numSlotsAvailable,
+							int numJobsRunningOrPending, int numJobsFinished, int numJobsCancelled, int numJobsFailed) {
+
+		super(numJobsRunningOrPending, numJobsFinished, numJobsCancelled, numJobsFailed);
+		
+		this.numTaskManagersConnected = numTaskManagersConnected;
+		this.numSlotsTotal = numSlotsTotal;
+		this.numSlotsAvailable = numSlotsAvailable;
+	}
+
+	public StatusOverview(int numTaskManagersConnected, int numSlotsTotal, int numSlotsAvailable,
+							JobsOverview jobs1, JobsOverview jobs2) {
+		super(jobs1, jobs2);
+		this.numTaskManagersConnected = numTaskManagersConnected;
+		this.numSlotsTotal = numSlotsTotal;
+		this.numSlotsAvailable = numSlotsAvailable;
+	}
+
+	public int getNumTaskManagersConnected() {
+		return numTaskManagersConnected;
+	}
+
+	public int getNumSlotsTotal() {
+		return numSlotsTotal;
+	}
+
+	public int getNumSlotsAvailable() {
+		return numSlotsAvailable;
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (this == obj) {
+			return true;
+		}
+		else if (obj instanceof  StatusOverview) {
+			StatusOverview that = (StatusOverview) obj;
+			return this.numTaskManagersConnected == that.numTaskManagersConnected &&
+					this.numSlotsTotal == that.numSlotsTotal &&
+					this.numSlotsAvailable == that.numSlotsAvailable &&
+					this.getNumJobsRunningOrPending() == that.getNumJobsRunningOrPending() &&
+					this.getNumJobsFinished() == that.getNumJobsFinished() &&
+					this.getNumJobsCancelled() == that.getNumJobsCancelled() &&
+					this.getNumJobsFailed() == that.getNumJobsFailed();
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		int result = super.hashCode();
+		result = 31 * result + numTaskManagersConnected;
+		result = 31 * result + numSlotsTotal;
+		result = 31 * result + numSlotsAvailable;
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "StatusOverview {" +
+				"numTaskManagersConnected=" + numTaskManagersConnected +
+				", numSlotsTotal=" + numSlotsTotal +
+				", numSlotsAvailable=" + numSlotsAvailable +
+				", numJobsRunningOrPending=" + getNumJobsRunningOrPending() +
+				", numJobsFinished=" + getNumJobsFinished() +
+				", numJobsCancelled=" + getNumJobsCancelled() +
+				", numJobsFailed=" + getNumJobsFailed() +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusWithJobIDsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusWithJobIDsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusWithJobIDsOverview.java
new file mode 100644
index 0000000..72fb01b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusWithJobIDsOverview.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.List;
+
+/**
+ * Response to the {@link org.apache.flink.runtime.messages.webmonitor.RequestStatusWithJobIDsOverview}
+ * message, carrying a description of the Flink cluster status.
+ */
+public class StatusWithJobIDsOverview extends JobsWithIDsOverview {
+
+	private static final long serialVersionUID = -729861859715105265L;
+	
+	private final int numTaskManagersConnected;
+	private final int numSlotsTotal;
+	private final int numSlotsAvailable;
+
+	public StatusWithJobIDsOverview(int numTaskManagersConnected, int numSlotsTotal, int numSlotsAvailable,
+									List<JobID> jobsRunningOrPending, List<JobID> jobsFinished,
+									List<JobID> jobsCancelled, List<JobID> jobsFailed) {
+
+		super(jobsRunningOrPending, jobsFinished, jobsCancelled, jobsFailed);
+		
+		this.numTaskManagersConnected = numTaskManagersConnected;
+		this.numSlotsTotal = numSlotsTotal;
+		this.numSlotsAvailable = numSlotsAvailable;
+	}
+
+	public StatusWithJobIDsOverview(int numTaskManagersConnected, int numSlotsTotal, int numSlotsAvailable,
+									JobsWithIDsOverview jobs1, JobsWithIDsOverview jobs2) {
+		super(jobs1, jobs2);
+		this.numTaskManagersConnected = numTaskManagersConnected;
+		this.numSlotsTotal = numSlotsTotal;
+		this.numSlotsAvailable = numSlotsAvailable;
+	}
+
+	public int getNumTaskManagersConnected() {
+		return numTaskManagersConnected;
+	}
+
+	public int getNumSlotsTotal() {
+		return numSlotsTotal;
+	}
+
+	public int getNumSlotsAvailable() {
+		return numSlotsAvailable;
+	}
+	
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "StatusOverview {" +
+				"numTaskManagersConnected=" + numTaskManagersConnected +
+				", numSlotsTotal=" + numSlotsTotal +
+				", numSlotsAvailable=" + numSlotsAvailable +
+				", numJobsRunningOrPending=" + getJobsRunningOrPending() +
+				", numJobsFinished=" + getJobsFinished() +
+				", numJobsCancelled=" + getJobsCancelled() +
+				", numJobsFailed=" + getJobsFailed() +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/package-info.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/package-info.java
new file mode 100644
index 0000000..730cf33
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the actor messages that are sent between the
+ * JobManager and components that are interested in the status of
+ * the JobManager. An example for such a component is the web
+ * runtime monitor, which sends messages to request the status.
+ */
+package org.apache.flink.runtime.messages.webmonitor;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitor.java
new file mode 100644
index 0000000..c2e2b0d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+/**
+ * Interface for web monitors. Defines life-cycle methods and properties.
+ */
+public interface WebMonitor {
+
+	/**
+	 * Starts the web monitor.
+	 * 
+	 * @throws Exception This method may forward exceptions, if it cannot bring up the web monitor.
+	 */
+	void start() throws Exception;
+
+	/**
+	 * Stops the web server.
+	 * 
+	 * @throws Exception This method may forward exceptions, if it cannot properly stop the web monitor.
+	 */
+	void stop() throws Exception;
+	
+	/**
+	 * Gets the port that the web server actually binds to. If port 0 was given in
+	 * the configuration, then a random free port will be picked. This method can
+	 * be used to determine this port.
+	 *
+	 * @return The port where the web server is listening, or -1, if no server is running.
+	 */
+	int getServerPort();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 5cf69ec..d8d51ce 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -19,12 +19,16 @@
 package org.apache.flink.runtime.jobmanager
 
 import java.io.{File, IOException}
+import java.lang.reflect.{InvocationTargetException, Constructor}
 import java.net.InetSocketAddress
 import java.util.Collections
 
 import akka.actor.Status.{Failure, Success}
 import akka.actor._
+import _root_.akka.pattern.ask
+
 import grizzled.slf4j.Logger
+
 import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
 import org.apache.flink.core.io.InputSplitAssigner
@@ -39,12 +43,14 @@ import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
 import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
 import org.apache.flink.runtime.messages.accumulators._
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint}
+import org.apache.flink.runtime.messages.webmonitor._
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.ZooKeeperUtil
 import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation}
+import org.apache.flink.runtime.webmonitor.WebMonitor
 import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -411,9 +417,11 @@ class JobManager(
 
     case message: AccumulatorMessage => handleAccumulatorMessage(message)
 
+    case message: InfoMessage => handleInfoRequestMessage(message, sender())
+
     case RequestStackTrace(instanceID) =>
       val gateway = instanceManager.getRegisteredInstanceById(instanceID).getInstanceGateway
-      gateway.forward(SendStackTrace, sender)
+      gateway.forward(SendStackTrace, sender())
 
     case Terminated(taskManager) =>
       if (instanceManager.isRegistered(taskManager)) {
@@ -502,8 +510,9 @@ class JobManager(
         }
         executionGraph.setNumberOfRetriesLeft(jobNumberRetries)
         executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
-        executionGraph.setScheduleMode(jobGraph.getScheduleMode)
-        executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
+        executionGraph.setScheduleMode(jobGraph.getScheduleMode())
+        executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling())
+        executionGraph.setJsonPlan(jobGraph.getJsonPlan())
         
         // initialize the vertices that have a master initialization hook
         // file output formats create directories here, input formats create splits
@@ -737,6 +746,115 @@ class JobManager(
   }
 
   /**
+   * Dedicated handler for monitor info request messages.
+   *
+   * @param actorMessage The info request message.
+   */
+  private def handleInfoRequestMessage(actorMessage: InfoMessage, theSender: ActorRef): Unit = {
+    try {
+      actorMessage match {
+
+        case _ : RequestJobsOverview =>
+          // get our own overview
+          val ourJobs = createJobStatusOverview()
+
+          // get the overview from the archive
+          val future = (archive ? RequestJobsOverview.getInstance())(timeout)
+
+          future.onSuccess {
+            case archiveOverview: JobsOverview =>
+              theSender ! new JobsOverview(ourJobs, archiveOverview)
+          }(context.dispatcher)
+
+        case _ : RequestJobsWithIDsOverview =>
+          // get our own overview
+          val ourJobs = createJobStatusWithIDsOverview()
+
+          // get the overview from the archive
+          val future = (archive ? RequestJobsWithIDsOverview.getInstance())(timeout)
+
+          future.onSuccess {
+            case archiveOverview: JobsWithIDsOverview =>
+              theSender ! new JobsWithIDsOverview(ourJobs, archiveOverview)
+          }(context.dispatcher)
+
+        case _ : RequestStatusOverview =>
+
+          val ourJobs = createJobStatusOverview()
+
+          val numTMs = instanceManager.getNumberOfRegisteredTaskManagers()
+          val numSlotsTotal = instanceManager.getTotalNumberOfSlots()
+          val numSlotsAvailable = instanceManager.getNumberOfAvailableSlots()
+
+          // add to that the jobs from the archive
+          val future = (archive ? RequestJobsOverview.getInstance())(timeout)
+          future.onSuccess {
+            case archiveOverview: JobsOverview =>
+              theSender ! new StatusOverview(numTMs, numSlotsTotal, numSlotsAvailable,
+                ourJobs, archiveOverview)
+          }(context.dispatcher)
+
+        case _ : RequestStatusWithJobIDsOverview =>
+
+          val ourJobs = createJobStatusWithIDsOverview()
+
+          val numTMs = instanceManager.getNumberOfRegisteredTaskManagers()
+          val numSlotsTotal = instanceManager.getTotalNumberOfSlots()
+          val numSlotsAvailable = instanceManager.getNumberOfAvailableSlots()
+
+          // add to that the jobs from the archive
+          val future = (archive ? RequestJobsWithIDsOverview.getInstance())(timeout)
+          future.onSuccess {
+            case archiveOverview: JobsWithIDsOverview =>
+              theSender ! new StatusWithJobIDsOverview(numTMs, numSlotsTotal, numSlotsAvailable,
+                ourJobs, archiveOverview)
+          }(context.dispatcher)
+
+        case _ => throw new Exception("Unrecognized info message " + actorMessage)
+      }
+    }
+    catch {
+      case e: Throwable => log.error(s"Error responding to message $actorMessage", e)
+    }
+  }
+
+  private def createJobStatusOverview() : JobsOverview = {
+    var runningOrPending = 0
+    var finished = 0
+    var canceled = 0
+    var failed = 0
+
+    currentJobs.values.foreach {
+      _._1.getState() match {
+        case JobStatus.FINISHED => finished += 1
+        case JobStatus.CANCELED => canceled += 1
+        case JobStatus.FAILED => failed += 1
+        case _ => runningOrPending += 1
+      }
+    }
+
+    new JobsOverview(runningOrPending, finished, canceled, failed)
+  }
+
+  private def createJobStatusWithIDsOverview() : JobsWithIDsOverview = {
+    val runningOrPending = new java.util.ArrayList[JobID]()
+    val finished = new java.util.ArrayList[JobID]()
+    val canceled = new java.util.ArrayList[JobID]()
+    val failed = new java.util.ArrayList[JobID]()
+    
+    currentJobs.values.foreach { case (graph, _) =>
+      graph.getState() match {
+        case JobStatus.FINISHED => finished.add(graph.getJobID)
+        case JobStatus.CANCELED => canceled.add(graph.getJobID)
+        case JobStatus.FAILED => failed.add(graph.getJobID)
+        case _ => runningOrPending.add(graph.getJobID)
+      }
+    }
+
+    new JobsWithIDsOverview(runningOrPending, finished, canceled, failed)
+  }
+
+  /**
    * Removes the job and sends it to the MemoryArchivist
    * @param jobID ID of the job to remove and archive
    */
@@ -948,7 +1066,17 @@ object JobManager {
       }
 
       // start the job manager web frontend
-      if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) != -1) {
+      if (configuration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, false)) {
+        LOG.info("Starting NEW JobManger web frontend")
+        
+        // start the new web frontend. we need to load this dynamically
+        // because it is not in the same project/dependencies
+        startWebRuntimeMonitor(configuration, jobManager, archiver)
+
+        // for the time being, we need to start both web servers
+        new WebInfoServer(configuration, jobManager, archiver).start()
+      }
+      else if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) != -1) {
         LOG.info("Starting JobManger web frontend")
         val webServer = new WebInfoServer(configuration, jobManager, archiver)
         webServer.start()
@@ -1331,4 +1459,59 @@ object JobManager {
     val timeout = AkkaUtils.getLookupTimeout(config)
     getJobManagerRemoteReference(address, system, timeout)
   }
+
+  // --------------------------------------------------------------------------
+  //  Utilities
+  // --------------------------------------------------------------------------
+
+  /**
+   * Starts the web runtime monitor. Because the actual implementation of the
+   * runtime monitor is in another project, we load the runtime monitor dynamically.
+   * 
+   * Because failure to start the web runtime monitor is not considered fatal,
+   * this method does not throw any exceptions, but only logs them.
+   * 
+   * @param config The configuration for the runtime monitor.
+   * @param jobManager The JobManager actor.
+   * @param archiver The execution graph archive actor.
+   */
+  def startWebRuntimeMonitor(config: Configuration,
+                             jobManager: ActorRef,
+                             archiver: ActorRef): Unit = {
+    // try to load and instantiate the class
+    val monitor: WebMonitor =
+      try {
+        val classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"
+        val clazz: Class[_ <: WebMonitor] = Class.forName(classname)
+                                                 .asSubclass(classOf[WebMonitor])
+        
+        val ctor: Constructor[_ <: WebMonitor] = clazz.getConstructor(classOf[Configuration],
+                                                                      classOf[ActorRef],
+                                                                      classOf[ActorRef])
+        ctor.newInstance(config, jobManager, archiver)
+      }
+      catch {
+        case e: ClassNotFoundException =>
+          LOG.error("Could not load web runtime monitor. " +
+              "Probably reason: flink-runtime-web is not in the classpath")
+          LOG.debug("Caught exception", e)
+          null
+        case e: InvocationTargetException =>
+          LOG.error("WebServer could not be created", e.getTargetException())
+          null
+        case t: Throwable =>
+          LOG.error("Failed to instantiate web runtime monitor.", t)
+          null
+      }
+    
+    if (monitor != null) {
+      try {
+        monitor.start()
+      }
+      catch {
+        case e: Exception => 
+          LOG.error("Failed to start web runtime monitor", e)
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 54d2f2f..6d0b220 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -18,9 +18,13 @@
 
 package org.apache.flink.runtime.jobmanager
 
+import java.util
+
 import akka.actor.Actor
+
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.messages.webmonitor._
 import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.executiongraph.ExecutionGraph
 import org.apache.flink.runtime.messages.ArchiveMessages._
@@ -100,6 +104,22 @@ class MemoryArchivist(private val max_entries: Int)
 
     case RequestJobCounts =>
       sender ! (finishedCnt, canceledCnt, failedCnt)
+
+    case _ : RequestJobsOverview =>
+      try {
+        sender ! createJobsOverview()
+      }
+      catch {
+        case t: Throwable => log.error("Exception while creating the jobs overview", t)
+      }
+
+    case _ : RequestJobsWithIDsOverview =>
+      try {
+        sender ! createJobsWithIDsOverview()
+      }
+      catch {
+        case t: Throwable => log.error("Exception while creating the jobs overview", t)
+      }
   }
 
   /**
@@ -110,6 +130,51 @@ class MemoryArchivist(private val max_entries: Int)
     throw new RuntimeException("Received unknown message " + message)
   }
 
+
+  // --------------------------------------------------------------------------
+  //  Request Responses
+  // --------------------------------------------------------------------------
+  
+  private def createJobsOverview() : JobsOverview = {
+    var runningOrPending = 0
+    var finished = 0
+    var canceled = 0
+    var failed = 0
+    
+    graphs.values.foreach {
+      _.getState() match {
+        case JobStatus.FINISHED => finished += 1
+        case JobStatus.CANCELED => canceled += 1
+        case JobStatus.FAILED => failed += 1
+        case _ => runningOrPending += 1
+      }
+    }
+    
+    new JobsOverview(runningOrPending, finished, canceled, failed)
+  }
+
+  private def createJobsWithIDsOverview() : JobsWithIDsOverview = {
+    val runningOrPending = new util.ArrayList[JobID]()
+    val finished = new util.ArrayList[JobID]()
+    val canceled = new util.ArrayList[JobID]()
+    val failed = new util.ArrayList[JobID]()
+
+    graphs.values.foreach { graph =>
+      graph.getState() match {
+        case JobStatus.FINISHED => finished.add(graph.getJobID)
+        case JobStatus.CANCELED => canceled.add(graph.getJobID)
+        case JobStatus.FAILED => failed.add(graph.getJobID)
+        case _ => runningOrPending.add(graph.getJobID)
+      }
+    }
+
+    new JobsWithIDsOverview(runningOrPending, finished, canceled, failed)
+  }
+  
+  // --------------------------------------------------------------------------
+  //  Utilities
+  // --------------------------------------------------------------------------
+
   /**
    * Remove old ExecutionGraphs belonging to a jobID
    * * if more than max_entries are in the queue.

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 424ee4c..2c28956 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -82,6 +82,10 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration,
     if (config.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false)) {
       val webServer = new WebInfoServer(configuration, jobManager, archiver)
       webServer.start()
+
+      if (userConfiguration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, false)) {
+        JobManager.startWebRuntimeMonitor(userConfiguration, jobManager, archiver)
+      }
     }
     jobManager
   }
@@ -170,7 +174,7 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration,
   }
 
   def setMemory(config: Configuration): Unit = {
-    // set this only if no memory was preconfigured
+    // set this only if no memory was pre-configured
     if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) {
 
       val bufferSizeNew: Int = config.getInteger(
@@ -234,12 +238,5 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration,
 }
 
 object LocalFlinkMiniCluster {
-  val LOG = LoggerFactory.getLogger(classOf[LocalFlinkMiniCluster])
-
-  def main(args: Array[String]) {
-    var conf = new Configuration;
-    conf.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 4)
-    conf.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, true)
-    var cluster = new LocalFlinkMiniCluster(conf, true)
-  }
+//  val LOG = LoggerFactory.getLogger(classOf[LocalFlinkMiniCluster])
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml b/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml
index 8cf0d48..649fbbd 100644
--- a/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml
+++ b/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml
@@ -76,6 +76,94 @@ under the License.
 					<groupId>org.eclipse.jdt</groupId>
 					<artifactId>core</artifactId>
 				</exclusion>
+				<exclusion>
+					<groupId>javax.servlet</groupId>
+					<artifactId>servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-json</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.codehaus.jettison</groupId>
+					<artifactId>jettison</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-server</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+					<artifactId>jasper-compiler</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+					<artifactId>jasper-runtime</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.servlet.jsp</groupId>
+					<artifactId>jsp-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey.jersey-test-framework</groupId>
+					<artifactId>jersey-test-framework-grizzly2+</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey.jersey-test-framework</groupId>
+					<artifactId>jersey-test-framework-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.servlet</groupId>
+					<artifactId>javax.servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-client</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-grizzly2</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-http</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-framework</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-http-server</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-rcm</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-http-servlet</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish</groupId>
+					<artifactId>javax.servlet</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey.contribs</groupId>
+					<artifactId>jersey-guice</artifactId>
+				</exclusion>
 			</exclusions>
 		</dependency>
 	</dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-shaded-hadoop/flink-shaded-include-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-hadoop/flink-shaded-include-yarn/pom.xml b/flink-shaded-hadoop/flink-shaded-include-yarn/pom.xml
index c3ed2bd..6e2c6b9 100644
--- a/flink-shaded-hadoop/flink-shaded-include-yarn/pom.xml
+++ b/flink-shaded-hadoop/flink-shaded-include-yarn/pom.xml
@@ -81,9 +81,89 @@ under the License.
 					<artifactId>servlet-api</artifactId>
 				</exclusion>
 				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-json</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.codehaus.jettison</groupId>
+					<artifactId>jettison</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-server</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+					<artifactId>jasper-compiler</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+					<artifactId>jasper-runtime</artifactId>
+				</exclusion>
+				<exclusion>
 					<groupId>javax.servlet.jsp</groupId>
 					<artifactId>jsp-api</artifactId>
 				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey.jersey-test-framework</groupId>
+					<artifactId>jersey-test-framework-grizzly2</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey.jersey-test-framework</groupId>
+					<artifactId>jersey-test-framework-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.servlet</groupId>
+					<artifactId>javax.servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-client</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-grizzly2</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-http</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-framework</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-http-server</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-rcm</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-http-servlet</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish</groupId>
+					<artifactId>javax.servlet</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey.contribs</groupId>
+					<artifactId>jersey-guice</artifactId>
+				</exclusion>
 			</exclusions> 
 		</dependency>
 
@@ -132,6 +212,90 @@ under the License.
 					<groupId>javax.servlet</groupId>
 					<artifactId>servlet-api</artifactId>
 				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-json</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.codehaus.jettison</groupId>
+					<artifactId>jettison</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-server</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+					<artifactId>jasper-compiler</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+					<artifactId>jasper-runtime</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.servlet.jsp</groupId>
+					<artifactId>jsp-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey.jersey-test-framework</groupId>
+					<artifactId>jersey-test-framework-grizzly2+</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey.jersey-test-framework</groupId>
+					<artifactId>jersey-test-framework-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.servlet</groupId>
+					<artifactId>javax.servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-client</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-grizzly2</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-http</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-framework</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-http-server</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-rcm</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-http-servlet</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish</groupId>
+					<artifactId>javax.servlet</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey.contribs</groupId>
+					<artifactId>jersey-guice</artifactId>
+				</exclusion>
 			</exclusions>
 		</dependency>
 
@@ -188,6 +352,94 @@ under the License.
 					<groupId>io.netty</groupId>
 					<artifactId>netty</artifactId>
 				</exclusion>
+				<exclusion>
+					<groupId>javax.servlet</groupId>
+					<artifactId>servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-json</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.codehaus.jettison</groupId>
+					<artifactId>jettison</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-server</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+					<artifactId>jasper-compiler</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+					<artifactId>jasper-runtime</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.servlet.jsp</groupId>
+					<artifactId>jsp-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey.jersey-test-framework</groupId>
+					<artifactId>jersey-test-framework-grizzly2+</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey.jersey-test-framework</groupId>
+					<artifactId>jersey-test-framework-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.servlet</groupId>
+					<artifactId>javax.servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-client</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-grizzly2</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-http</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-framework</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-http-server</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-rcm</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-http-servlet</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish</groupId>
+					<artifactId>javax.servlet</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey.contribs</groupId>
+					<artifactId>jersey-guice</artifactId>
+				</exclusion>
 			</exclusions>
 		</dependency>
 
@@ -244,6 +496,94 @@ under the License.
 					<groupId>io.netty</groupId>
 					<artifactId>netty</artifactId>
 				</exclusion>
+				<exclusion>
+					<groupId>javax.servlet</groupId>
+					<artifactId>servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-json</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.codehaus.jettison</groupId>
+					<artifactId>jettison</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-server</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+					<artifactId>jasper-compiler</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+					<artifactId>jasper-runtime</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.servlet.jsp</groupId>
+					<artifactId>jsp-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey.jersey-test-framework</groupId>
+					<artifactId>jersey-test-framework-grizzly2+</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey.jersey-test-framework</groupId>
+					<artifactId>jersey-test-framework-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.servlet</groupId>
+					<artifactId>javax.servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-client</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-grizzly2</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-http</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-framework</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-http-server</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-rcm</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-http-servlet</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish</groupId>
+					<artifactId>javax.servlet</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey.contribs</groupId>
+					<artifactId>jersey-guice</artifactId>
+				</exclusion>
 			</exclusions>
 		</dependency>
 
@@ -300,6 +640,94 @@ under the License.
 					<groupId>io.netty</groupId>
 					<artifactId>netty</artifactId>
 				</exclusion>
+				<exclusion>
+					<groupId>javax.servlet</groupId>
+					<artifactId>servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-json</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.codehaus.jettison</groupId>
+					<artifactId>jettison</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-server</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+					<artifactId>jasper-compiler</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+					<artifactId>jasper-runtime</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.servlet.jsp</groupId>
+					<artifactId>jsp-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey.jersey-test-framework</groupId>
+					<artifactId>jersey-test-framework-grizzly2+</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey.jersey-test-framework</groupId>
+					<artifactId>jersey-test-framework-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.servlet</groupId>
+					<artifactId>javax.servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-client</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey</groupId>
+					<artifactId>jersey-grizzly2</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-http</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-framework</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-http-server</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-rcm</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish.grizzly</groupId>
+					<artifactId>grizzly-http-servlet</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.glassfish</groupId>
+					<artifactId>javax.servlet</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jersey.contribs</groupId>
+					<artifactId>jersey-guice</artifactId>
+				</exclusion>
 			</exclusions>
 		</dependency>
 	</dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index e108970..b1134ba 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -216,9 +216,7 @@ public class TestBaseUtils {
 		}
 		return readers;
 	}
-
-
-
+	
 	public static BufferedInputStream[] getResultInputStream(String resultPath) throws IOException {
 		return getResultInputStream(resultPath, new String[]{});
 	}
@@ -480,9 +478,9 @@ public class TestBaseUtils {
 			cienv.putAll(newenv);
 		} catch (NoSuchFieldException e) {
 			try {
-				Class[] classes = Collections.class.getDeclaredClasses();
+				Class<?>[] classes = Collections.class.getDeclaredClasses();
 				Map<String, String> env = System.getenv();
-				for (Class cl : classes) {
+				for (Class<?> cl : classes) {
 					if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
 						Field field = cl.getDeclaredField("m");
 						field.setAccessible(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 509b86f..79e75d2 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -113,9 +113,15 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration,
 
     val jobManager = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
 
-    if (userConfiguration.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER,false)){
+    if (userConfiguration.getBoolean(
+      ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false))
+    {
       val webServer = new WebInfoServer(configuration, jobManager, archive)
       webServer.start()
+
+      if (userConfiguration.getBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, false)) {
+        JobManager.startWebRuntimeMonitor(userConfiguration, jobManager, archive)
+      }
     }
 
     jobManager

http://git-wip-us.apache.org/repos/asf/flink/blob/44ee1c1b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1f63ff3..175c008 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,7 @@ under the License.
 		<module>flink-java</module>
 		<module>flink-scala</module>
 		<module>flink-runtime</module>
+		<module>flink-runtime-web</module>
 		<module>flink-optimizer</module>
 		<module>flink-examples</module>
 		<module>flink-clients</module>