You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/17 21:03:41 UTC

[21/21] flink git commit: [FLINK-2844] [web frontend] Remove old web interface

[FLINK-2844] [web frontend] Remove old web interface

- make new web one the default
- adapt tests
- make web directory a resource to be included in the fat jar
- serve static files of web interface dynamic through the class loader
- run on YARN
- remove Jetty dependencies from poms


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df448625
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df448625
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df448625

Branch: refs/heads/master
Commit: df448625817817ecdc897f2e14b03bfff426467d
Parents: a8eeb3b
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Oct 9 17:17:52 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Oct 17 18:45:02 2015 +0200

----------------------------------------------------------------------
 flink-clients/pom.xml                           |    8 +-
 .../flink/configuration/ConfigConstants.java    |   14 +-
 flink-dist/src/main/assemblies/bin.xml          |    7 -
 flink-runtime-web/pom.xml                       |   12 +
 .../runtime/webmonitor/WebMonitorConfig.java    |   25 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  114 +-
 .../files/StaticFileServerHandler.java          |   18 +
 flink-runtime/pom.xml                           |   12 -
 .../jobmanager/web/JobManagerInfoServlet.java   |  737 --
 .../runtime/jobmanager/web/JsonFactory.java     |  112 -
 .../jobmanager/web/LogfileInfoServlet.java      |  116 -
 .../runtime/jobmanager/web/MenuServlet.java     |  120 -
 .../jobmanager/web/SetupInfoServlet.java        |  216 -
 .../runtime/jobmanager/web/WebInfoServer.java   |  293 -
 .../runtime/webmonitor/WebMonitorUtils.java     |  105 +-
 .../resources/web-docs-infoserver/analyze.html  |  162 -
 .../web-docs-infoserver/blank-page.html         |  115 -
 .../web-docs-infoserver/configuration.html      |  121 -
 .../web-docs-infoserver/css/bootstrap.css       | 5831 -----------
 .../web-docs-infoserver/css/bootstrap.css.map   |    1 -
 .../web-docs-infoserver/css/bootstrap.min.css   |    7 -
 .../web-docs-infoserver/css/nephelefrontend.css |   21 -
 .../web-docs-infoserver/css/rickshaw.min.css    |    1 -
 .../web-docs-infoserver/css/sb-admin.css        |  164 -
 .../web-docs-infoserver/css/timeline.css        |  204 -
 .../font-awesome/css/font-awesome.css           | 1338 ---
 .../font-awesome/css/font-awesome.min.css       |    4 -
 .../font-awesome/fonts/FontAwesome.otf          |  Bin 62856 -> 0 bytes
 .../font-awesome/fonts/fontawesome-webfont.eot  |  Bin 38205 -> 0 bytes
 .../font-awesome/fonts/fontawesome-webfont.svg  |  414 -
 .../font-awesome/fonts/fontawesome-webfont.ttf  |  Bin 80652 -> 0 bytes
 .../font-awesome/fonts/fontawesome-webfont.woff |  Bin 44432 -> 0 bytes
 .../font-awesome/less/bordered-pulled.less      |   16 -
 .../font-awesome/less/core.less                 |   12 -
 .../font-awesome/less/fixed-width.less          |    6 -
 .../font-awesome/less/font-awesome.less         |   17 -
 .../font-awesome/less/icons.less                |  412 -
 .../font-awesome/less/larger.less               |   13 -
 .../font-awesome/less/list.less                 |   19 -
 .../font-awesome/less/mixins.less               |   20 -
 .../font-awesome/less/path.less                 |   14 -
 .../font-awesome/less/rotated-flipped.less      |    9 -
 .../font-awesome/less/spinning.less             |   30 -
 .../font-awesome/less/stacked.less              |   20 -
 .../font-awesome/less/variables.less            |  381 -
 .../font-awesome/scss/_bordered-pulled.scss     |   16 -
 .../font-awesome/scss/_core.scss                |   12 -
 .../font-awesome/scss/_fixed-width.scss         |    6 -
 .../font-awesome/scss/_icons.scss               |  412 -
 .../font-awesome/scss/_larger.scss              |   13 -
 .../font-awesome/scss/_list.scss                |   19 -
 .../font-awesome/scss/_mixins.scss              |   20 -
 .../font-awesome/scss/_path.scss                |   14 -
 .../font-awesome/scss/_rotated-flipped.scss     |    9 -
 .../font-awesome/scss/_spinning.scss            |   30 -
 .../font-awesome/scss/_stacked.scss             |   20 -
 .../font-awesome/scss/_variables.scss           |  381 -
 .../font-awesome/scss/font-awesome.scss         |   17 -
 .../resources/web-docs-infoserver/history.html  |  126 -
 .../web-docs-infoserver/img/flink-logo.png      |  Bin 6096 -> 0 bytes
 .../resources/web-docs-infoserver/index.html    |  223 -
 .../web-docs-infoserver/js/analyzer.js          |  329 -
 .../web-docs-infoserver/js/bootstrap.js         | 1951 ----
 .../web-docs-infoserver/js/configuration.js     |   40 -
 .../web-docs-infoserver/js/d3.layout.min.js     |    1 -
 .../resources/web-docs-infoserver/js/d3.min.js  |    2 -
 .../resources/web-docs-infoserver/js/helpers.js |   40 -
 .../web-docs-infoserver/js/jcanvas.min.js       |   61 -
 .../js/jobmanagerFrontend.js                    |  470 -
 .../web-docs-infoserver/js/jquery-2.1.0.js      | 9111 ------------------
 .../web-docs-infoserver/js/rickshaw.min.js      |    3 -
 .../web-docs-infoserver/js/taskmanager.js       |  464 -
 .../web-docs-infoserver/js/timeline.js          | 6444 -------------
 .../web-docs-infoserver/taskmanagers.html       |  182 -
 .../flink/runtime/jobmanager/JobManager.scala   |   71 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |   24 +-
 flink-tests/pom.xml                             |    9 +-
 .../flink/test/web/WebFrontendITCase.java       |   76 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |   31 +-
 pom.xml                                         |   30 -
 80 files changed, 276 insertions(+), 31642 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 84264f9..928ba4f 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -73,23 +73,23 @@ under the License.
 			<artifactId>guava</artifactId>
 			<version>${guava.version}</version>
 		</dependency>
-		
+
 		<dependency>
 			<groupId>org.eclipse.jetty</groupId>
 			<artifactId>jetty-server</artifactId>
-			<!-- version is derived from base module -->
+			<version>8.0.0.M1</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.eclipse.jetty</groupId>
 			<artifactId>jetty-security</artifactId>
-			<!-- version is derived from base module -->
+			<version>8.0.0.M1</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.eclipse.jetty</groupId>
 			<artifactId>jetty-servlet</artifactId>
-			<!-- version is derived from base module -->
+			<version>8.0.0.M1</version>
 		</dependency>
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index b1ffdd8..5d6f1c7 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -293,23 +293,15 @@ public final class ConfigConstants {
 	public static final String JOB_MANAGER_WEB_PORT_KEY = "jobmanager.web.port";
 
 	/**
-	 * The option that specifies whether to use the new web frontend
-	 */
-	public static final String JOB_MANAGER_NEW_WEB_FRONTEND_KEY = "jobmanager.new-web-frontend";
-	
-	/**
 	 * The config parameter defining the number of archived jobs for the jobmanager
 	 */
 	public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = "jobmanager.web.history";
-	
+
 	public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.logpath";
 
-	/** The directory where the web server's static contents is stored */
-	public static final String JOB_MANAGER_WEB_DOC_ROOT_KEY = "jobmanager.web.docroot";
-	
-	
+
 	// ------------------------------ Web Client ------------------------------
-	
+
 	/**
 	 * The config parameter defining port for the pact web-frontend server.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index 09102ef..e20e94a 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -123,13 +123,6 @@ under the License.
 			<fileMode>0644</fileMode>
 		</fileSet>
 
-		<!-- copy the web documents -->
-		<fileSet>
-			<directory>../flink-runtime-web/web-dashboard/web</directory>
-			<outputDirectory>resources/web-runtime-monitor</outputDirectory>
-			<fileMode>0644</fileMode>
-		</fileSet>
-		
 		<!-- copy the tools -->
 		<fileSet>
 			<directory>src/main/flink-bin/tools</directory>

http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index afe71b4..ffb68bc 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -34,6 +34,18 @@ under the License.
 
 	<packaging>jar</packaging>
 
+	<build>
+		<resources>
+			<resource>
+				<!-- Only include the web folder from the web-dashboard directory -->
+				<directory>web-dashboard</directory>
+				<includes>
+					<include>web/**</include>
+				</includes>
+			</resource>
+		</resources>
+	</build>
+
 	<dependencies>
 
 		<!-- ===================================================

http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
index c8e64c9..5b537b7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
@@ -31,13 +31,10 @@ public class WebMonitorConfig {
 	/** The port for the runtime monitor web-frontend server. */
 	public static final String JOB_MANAGER_WEB_PORT_KEY = ConfigConstants.JOB_MANAGER_WEB_PORT_KEY;
 
-	/** The directory where the web server's static contents is stored */
-	public static final String JOB_MANAGER_WEB_DOC_ROOT_KEY = ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY;
-
 	/** The initial refresh interval for the web dashboard */
 	public static final String JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY = "jobmanager.web.refresh-interval";
-	
-	
+
+
 	// ------------------------------------------------------------------------
 	//  Default values
 	// ------------------------------------------------------------------------
@@ -47,32 +44,28 @@ public class WebMonitorConfig {
 
 	/** Default refresh interval for the web dashboard (= 3000 msecs) */
 	public static final long DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL = 3000;
-	
-	
+
+
 	// ------------------------------------------------------------------------
 	//  Config
 	// ------------------------------------------------------------------------
-	
+
 	/** The configuration queried by this config object */
 	private final Configuration config;
 
-	
+
 	public WebMonitorConfig(Configuration config) {
 		if (config == null) {
 			throw new NullPointerException();
 		}
 		this.config = config;
 	}
-	
-	
+
+
 	public int getWebFrontendPort() {
 		return config.getInteger(JOB_MANAGER_WEB_PORT_KEY, DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
 	}
-	
-	public String getWebRoot() {
-		return config.getString(JOB_MANAGER_WEB_DOC_ROOT_KEY, null);
-	}
-	
+
 	public long getRefreshInterval() {
 		return config.getLong(JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY, DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 96da0c8..7c252dc 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -32,9 +32,8 @@ import io.netty.handler.codec.http.router.Handler;
 import io.netty.handler.codec.http.router.Router;
 
 import io.netty.handler.stream.ChunkedWriteHandler;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
@@ -66,7 +65,9 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -82,15 +83,12 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 	/** By default, all requests to the JobManager have a timeout of 10 seconds */ 
 	public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
-	
+
 	/** Logger for web frontend startup / shutdown messages */
 	private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);
-	
-	/** Teh default path under which the static contents is stored */
-	private static final String STATIC_CONTENTS_PATH = "resources/web-runtime-monitor";
-	
+
 	// ------------------------------------------------------------------------
-	
+
 	/** Guarding concurrent modifications to the server channel pipeline during startup and shutdown */
 	private final Object startupShutdownLock = new Object();
 
@@ -98,48 +96,34 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 	/** LeaderRetrievalListener which stores the currently leading JobManager and its archive */
 	private final JobManagerArchiveRetriever retriever;
-	
+
 	private final Router router;
 
 	private final int configuredPort;
 
 	private ServerBootstrap bootstrap;
-	
+
 	private Channel serverChannel;
 
-	
+	private final File webRootDir;
+
+	private AtomicBoolean isShutdown = new AtomicBoolean();
+
+
 	public WebRuntimeMonitor(
 				Configuration config,
 				LeaderRetrievalService leaderRetrievalService,
 				ActorSystem actorSystem) throws IOException
 	{
 		this.leaderRetrievalService = checkNotNull(leaderRetrievalService);
-		
+
 		final WebMonitorConfig cfg = new WebMonitorConfig(config);
-		
-		// figure out where our static contents is
-		final String flinkRoot = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
-		final String configuredWebRoot = cfg.getWebRoot();
-		
-		final File webRootDir;
-		if (configuredWebRoot != null) {
-			webRootDir = new File(configuredWebRoot);
-		}
-		else if (flinkRoot != null) {
-			webRootDir = new File(flinkRoot, STATIC_CONTENTS_PATH);
-		}
-		else {
-			throw new IllegalConfigurationException("The given configuration provides neither the web-document root (" 
-					+ WebMonitorConfig.JOB_MANAGER_WEB_DOC_ROOT_KEY + "), not the Flink installation root ("
-					+ ConfigConstants.FLINK_BASE_DIR_PATH_KEY + ").");
-		}
-		
-		// validate that the doc root is a valid directory
-		if (!(webRootDir.exists() && webRootDir.isDirectory() && webRootDir.canRead())) {
-			throw new IllegalConfigurationException("The path to the static contents (" + 
-					webRootDir.getAbsolutePath() + ") is not a readable directory.");
-		}
-		
+
+		// create an empty directory in temp for the web server
+		String fileName = String.format("flink-web-%s", UUID.randomUUID().toString());
+		webRootDir = new File(System.getProperty("java.io.tmpdir"), fileName);
+		LOG.info("Using directory {} for the web interface files", webRootDir);
+
 		// port configuration
 		this.configuredPort = cfg.getWebFrontendPort();
 		if (this.configuredPort < 0) {
@@ -150,7 +134,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 		FiniteDuration lookupTimeout = AkkaUtils.getTimeout(config);
 
 		retriever = new JobManagerArchiveRetriever(this, actorSystem, lookupTimeout, timeout);
-		
+
 		ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(retriever);
 
 		router = new Router()
@@ -200,13 +184,29 @@ public class WebRuntimeMonitor implements WebMonitor {
 			if (this.bootstrap != null) {
 				throw new IllegalStateException("The server has already been started");
 			}
-			
+
+			// add shutdown hook for deleting the directory
+			try {
+				Runtime.getRuntime().addShutdownHook(new Thread() {
+					@Override
+					public void run() {
+						shutdown();
+					}
+				});
+			} catch (IllegalStateException e) {
+				// race, JVM is in shutdown already, we can safely ignore this
+				LOG.debug("Unable to add shutdown hook, shutdown already in progress", e);
+			} catch(Throwable t) {
+				// these errors usually happen when the shutdown is already in progress
+				LOG.warn("Error while adding shutdown hook", t);
+			}
+
 			ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
 	
 				@Override
 				protected void initChannel(SocketChannel ch) {
 					Handler handler = new Handler(router);
-					
+
 					ch.pipeline()
 						.addLast(new HttpServerCodec())
 						.addLast(new HttpObjectAggregator(65536))
@@ -214,34 +214,34 @@ public class WebRuntimeMonitor implements WebMonitor {
 						.addLast(handler.name(), handler);
 				}
 			};
-			
+
 			NioEventLoopGroup bossGroup   = new NioEventLoopGroup(1);
 			NioEventLoopGroup workerGroup = new NioEventLoopGroup();
-	
+
 			this.bootstrap = new ServerBootstrap();
 			this.bootstrap
 					.group(bossGroup, workerGroup)
 					.channel(NioServerSocketChannel.class)
 					.childHandler(initializer);
-	
+
 			Channel ch = this.bootstrap.bind(configuredPort).sync().channel();
 			this.serverChannel = ch;
-			
+
 			InetSocketAddress bindAddress = (InetSocketAddress) ch.localAddress();
 			String address = bindAddress.getAddress().getHostAddress();
 			int port = bindAddress.getPort();
-			
+
 			LOG.info("Web frontend listening at " + address + ':' + port);
 
 			leaderRetrievalService.start(retriever);
 		}
 	}
-	
+
 	@Override
 	public void stop() throws Exception {
 		synchronized (startupShutdownLock) {
 			leaderRetrievalService.stop();
-			
+
 			if (this.serverChannel != null) {
 				this.serverChannel.close().awaitUninterruptibly();
 				this.serverChannel = null;
@@ -252,9 +252,11 @@ public class WebRuntimeMonitor implements WebMonitor {
 				}
 				this.bootstrap = null;
 			}
+
+			shutdown();
 		}
 	}
-	
+
 	@Override
 	public int getServerPort() {
 		Channel server = this.serverChannel;
@@ -266,14 +268,26 @@ public class WebRuntimeMonitor implements WebMonitor {
 				LOG.error("Cannot access local server port", e);
 			}
 		}
-			
+
 		return -1;
 	}
-	
+
+	private void shutdown() {
+		if (!isShutdown.compareAndSet(false, true)) {
+			return;
+		}
+		try {
+			LOG.info("Removing web root dir {}", webRootDir);
+			FileUtils.deleteDirectory(webRootDir);
+		} catch (Throwable t) {
+			LOG.warn("Error while deleting web root dir {}", webRootDir, t);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
-	
+
 	private static RuntimeMonitorHandler handler(RequestHandler handler) {
 		return new RuntimeMonitorHandler(handler);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index e368ea9..b1497f9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -49,7 +49,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.InputStream;
 import java.io.RandomAccessFile;
+import java.nio.file.Files;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
 import java.util.Date;
@@ -135,6 +137,22 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 
 		// convert to absolute path
 		final File file = new File(rootPath, requestPath);
+
+		if(!file.exists()) {
+			// file does not exist. Try to load it with the classloader
+			ClassLoader cl = StaticFileServerHandler.class.getClassLoader();
+			try(InputStream resourceStream = cl.getResourceAsStream("web" + requestPath)) {
+				if (resourceStream == null) {
+						logger.debug("Unable to load requested file {} from classloader", requestPath);
+						sendError(ctx, NOT_FOUND);
+						return;
+				}
+				logger.debug("Loading missing file from classloader: {}", requestPath);
+				// ensure that directory to file exists.
+				file.getParentFile().mkdirs();
+				Files.copy(resourceStream, file.toPath());
+			}
+		}
 		
 		if (!file.exists() || file.isHidden() || file.isDirectory() || !file.isFile()) {
 			sendError(ctx, NOT_FOUND);

http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 1802709..a831eba 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -74,18 +74,6 @@ under the License.
 			<groupId>commons-io</groupId>
 			<artifactId>commons-io</artifactId>
 		</dependency>
-		
-		<dependency>
-			<groupId>org.eclipse.jetty</groupId>
-			<artifactId>jetty-server</artifactId>
-			<!-- version is derived from base module -->
-		</dependency>
-
-		<dependency>
-			<groupId>org.eclipse.jetty</groupId>
-			<artifactId>jetty-servlet</artifactId>
-			<!-- version is derived from base module -->
-		</dependency>
 
 		<dependency>
 			<groupId>com.amazonaws</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
deleted file mode 100644
index 0ecc941..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
+++ /dev/null
@@ -1,737 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.web;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.messages.ArchiveMessages.ArchivedJobs;
-import org.apache.flink.runtime.messages.ArchiveMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.JobResponse;
-import org.apache.flink.runtime.messages.JobManagerMessages.JobFound;
-import org.apache.flink.runtime.messages.accumulators.AccumulatorResultStringsFound;
-import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
-import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsNotFound;
-import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResultsStringified;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.StringUtils;
-import org.eclipse.jetty.io.EofException;
-
-import scala.Tuple3;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-public class JobManagerInfoServlet extends HttpServlet {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(JobManagerInfoServlet.class);
-
-	/** Underlying JobManager */
-	private final ActorGateway jobmanager;
-	private final ActorGateway archive;
-	private final FiniteDuration timeout;
-
-
-	public JobManagerInfoServlet(ActorGateway jobmanager, ActorGateway archive, FiniteDuration timeout) {
-		this.jobmanager = jobmanager;
-		this.archive = archive;
-		this.timeout = timeout;
-	}
-
-
-	@Override
-	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException,
-			IOException {
-
-		resp.setStatus(HttpServletResponse.SC_OK);
-		resp.setContentType("application/json");
-
-		Future<Object> response;
-		Object result;
-
-		try {
-			if("archive".equals(req.getParameter("get"))) {
-				response = archive.ask(ArchiveMessages.getRequestArchivedJobs(), timeout);
-
-				result = Await.result(response, timeout);
-
-				if(!(result instanceof ArchivedJobs)) {
-					throw new RuntimeException("RequestArchiveJobs requires a response of type " +
-							"ArchivedJobs. Instead the response is of type " + result.getClass() +
-							".");
-				} else {
-					final List<ExecutionGraph> archivedJobs = new ArrayList<ExecutionGraph>(
-							((ArchivedJobs) result).asJavaCollection());
-
-					writeJsonForArchive(resp.getWriter(), archivedJobs);
-				}
-			}
-			else if("jobcounts".equals(req.getParameter("get"))) {
-				response = archive.ask(ArchiveMessages.getRequestJobCounts(), timeout);
-
-				result = Await.result(response, timeout);
-
-				if(!(result instanceof Tuple3)) {
-					throw new RuntimeException("RequestJobCounts requires a response of type " +
-							"Tuple3. Instead the response is of type " + result.getClass() +
-							".");
-				} else {
-					writeJsonForJobCounts(resp.getWriter(), (Tuple3)result);
-				}
-			}
-			else if("job".equals(req.getParameter("get"))) {
-				String jobId = req.getParameter("job");
-
-				response = archive.ask(new RequestJob(JobID.fromHexString(jobId)), timeout);
-
-				result = Await.result(response, timeout);
-
-				if(!(result instanceof JobResponse)){
-					throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
-							"Instead the response is of type " + result.getClass());
-				}else {
-					final JobResponse jobResponse = (JobResponse) result;
-
-					if(jobResponse instanceof JobFound){
-						ExecutionGraph archivedJob = ((JobFound)result).executionGraph();
-						writeJsonForArchivedJob(resp.getWriter(), archivedJob);
-					} else {
-						LOG.warn("DoGet:job: Could not find job for job ID " + jobId);
-					}
-				}
-			}
-			else if("groupvertex".equals(req.getParameter("get"))) {
-				String jobId = req.getParameter("job");
-				String groupVertexId = req.getParameter("groupvertex");
-
-				// No group vertex specified
-				if (groupVertexId.equals("null")) {
-					return;
-				}
-
-				response = archive.ask(new RequestJob(JobID.fromHexString(jobId)), timeout);
-
-				result = Await.result(response, timeout);
-
-				if(!(result instanceof JobResponse)){
-					throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
-							"Instead the response is of type " + result.getClass());
-				}else {
-					final JobResponse jobResponse = (JobResponse) result;
-
-					if(jobResponse instanceof JobFound && groupVertexId != null){
-						ExecutionGraph archivedJob = ((JobFound)jobResponse).executionGraph();
-
-						writeJsonForArchivedJobGroupvertex(resp.getWriter(), archivedJob,
-								JobVertexID.fromHexString(groupVertexId));
-					} else {
-						LOG.warn("DoGet:groupvertex: Could not find job for job ID " + jobId);
-					}
-				}
-			}
-			else if("taskmanagers".equals(req.getParameter("get"))) {
-
-				response = jobmanager.ask(
-						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-						timeout);
-
-				result = Await.result(response, timeout);
-
-				if(!(result instanceof Integer)) {
-					throw new RuntimeException("RequestNumberRegisteredTaskManager requires a " +
-							"response of type Integer. Instead the response is of type " +
-							result.getClass() + ".");
-				} else {
-					final int numberOfTaskManagers = (Integer)result;
-
-					final Future<Object> responseRegisteredSlots = jobmanager.ask(
-							JobManagerMessages.getRequestTotalNumberOfSlots(),
-							timeout);
-
-					final Object resultRegisteredSlots = Await.result(responseRegisteredSlots,
-							timeout);
-
-					if(!(resultRegisteredSlots instanceof Integer)) {
-						throw new RuntimeException("RequestTotalNumberOfSlots requires a response of " +
-								"type Integer. Instaed the response of type " +
-								resultRegisteredSlots.getClass() + ".");
-					} else {
-						final int numberOfRegisteredSlots = (Integer) resultRegisteredSlots;
-
-						resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagers +", " +
-								"\"slots\": "+numberOfRegisteredSlots+"}");
-					}
-				}
-			}
-			else if("cancel".equals(req.getParameter("get"))) {
-				String jobId = req.getParameter("job");
-
-				response = jobmanager.ask(
-						new CancelJob(JobID.fromHexString(jobId)),
-						timeout);
-
-				Await.ready(response, timeout);
-			}
-			else if("updates".equals(req.getParameter("get"))) {
-				String jobId = req.getParameter("job");
-				writeJsonUpdatesForJob(resp.getWriter(), JobID.fromHexString(jobId));
-			} else if ("version".equals(req.getParameter("get"))) {
-				writeJsonForVersion(resp.getWriter());
-			}
-			else{
-				response = jobmanager.ask(
-						JobManagerMessages.getRequestRunningJobs(),
-						timeout);
-
-				result = Await.result(response, timeout);
-
-				if(!(result instanceof RunningJobs)){
-					throw new RuntimeException("RequestRunningJobs requires a response of type " +
-							"RunningJobs. Instead the response of type " + result.getClass() + ".");
-				} else {
-					final Iterable<ExecutionGraph> runningJobs =
-							((RunningJobs) result).asJavaIterable();
-
-					writeJsonForJobs(resp.getWriter(), runningJobs);
-				}
-			}
-
-		} catch (Exception e) {
-			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-			resp.getWriter().print(e.getMessage());
-			if (LOG.isWarnEnabled()) {
-				LOG.warn(StringUtils.stringifyException(e));
-			}
-		}
-	}
-
-	/**
-	 * Writes ManagementGraph as Json for all recent jobs
-	 *
-	 * @param wrt
-	 * @param graphs
-	 */
-	private void writeJsonForJobs(PrintWriter wrt, Iterable<ExecutionGraph> graphs) {
-		try {
-			wrt.write("[");
-
-			Iterator<ExecutionGraph> it = graphs.iterator();
-			// Loop Jobs
-			while(it.hasNext()){
-				ExecutionGraph graph = it.next();
-
-				writeJsonForJob(wrt, graph);
-
-				//Write seperator between json objects
-				if(it.hasNext()) {
-					wrt.write(",");
-				}
-			}
-			wrt.write("]");
-
-		} catch (EofException eof) { // Connection closed by client
-			LOG.info("Info server for jobmanager: Connection closed by client, EofException");
-		} catch (IOException ioe) { // Connection closed by client
-			LOG.info("Info server for jobmanager: Connection closed by client, IOException");
-		}
-	}
-
-	private void writeJsonForJob(PrintWriter wrt, ExecutionGraph graph) throws IOException {
-		//Serialize job to json
-		wrt.write("{");
-		wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
-		wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
-		wrt.write("\"status\": \""+ graph.getState() + "\",");
-		wrt.write("\"time\": " + graph.getStatusTimestamp(graph.getState())+",");
-
-		// Serialize ManagementGraph to json
-		wrt.write("\"groupvertices\": [");
-		boolean first = true;
-
-		for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
-			//Write seperator between json objects
-			if(first) {
-				first = false;
-			} else {
-				wrt.write(","); }
-
-			wrt.write(JsonFactory.toJson(groupVertex));
-		}
-		wrt.write("]");
-		wrt.write("}");
-
-	}
-
-	/**
-	 * Writes Json with a list of currently archived jobs, sorted by time
-	 *
-	 * @param wrt
-	 * @param graphs
-	 */
-	private void writeJsonForArchive(PrintWriter wrt, List<ExecutionGraph> graphs) {
-
-		wrt.write("[");
-
-		// sort jobs by time
-		Collections.sort(graphs,  new Comparator<ExecutionGraph>() {
-			@Override
-			public int compare(ExecutionGraph o1, ExecutionGraph o2) {
-				if(o1.getStatusTimestamp(o1.getState()) < o2.getStatusTimestamp(o2.getState())) {
-					return 1;
-				} else {
-					return -1;
-				}
-			}
-
-		});
-
-		// Loop Jobs
-		for (int i = 0; i < graphs.size(); i++) {
-			ExecutionGraph graph = graphs.get(i);
-
-			//Serialize job to json
-			wrt.write("{");
-			wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
-			wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
-			wrt.write("\"status\": \""+ graph.getState() + "\",");
-			wrt.write("\"time\": " + graph.getStatusTimestamp(graph.getState()));
-
-			wrt.write("}");
-
-			//Write seperator between json objects
-			if(i != graphs.size() - 1) {
-				wrt.write(",");
-			}
-		}
-		wrt.write("]");
-
-	}
-
-	/**
-	 * Writes Json with the job counts
-	 *
-	 * @param wrt
-	 * @param jobCounts
-	 */
-	private void writeJsonForJobCounts(PrintWriter wrt, Tuple3<Integer, Integer, Integer> jobCounts) {
-
-		wrt.write("{");
-		wrt.write("\"finished\": " + jobCounts._1() + ",");
-		wrt.write("\"canceled\": " + jobCounts._2() + ",");
-		wrt.write("\"failed\": "   + jobCounts._3());
-		wrt.write("}");
-
-	}
-
-	/**
-	 * Writes infos about archived job in Json format, including groupvertices and groupverticetimes
-	 *
-	 * @param wrt
-	 * @param graph
-	 */
-	private void writeJsonForArchivedJob(PrintWriter wrt, ExecutionGraph graph) {
-		try {
-			wrt.write("[");
-
-			//Serialize job to json
-			wrt.write("{");
-			wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
-			wrt.write("\"jobname\": \"" + graph.getJobName() + "\",");
-			wrt.write("\"status\": \"" + graph.getState() + "\",");
-			wrt.write("\"SCHEDULED\": " + graph.getStatusTimestamp(JobStatus.CREATED) + ",");
-			wrt.write("\"RUNNING\": " + graph.getStatusTimestamp(JobStatus.RUNNING) + ",");
-			wrt.write("\"FINISHED\": " + graph.getStatusTimestamp(JobStatus.FINISHED) + ",");
-			wrt.write("\"FAILED\": " + graph.getStatusTimestamp(JobStatus.FAILED) + ",");
-			wrt.write("\"CANCELED\": " + graph.getStatusTimestamp(JobStatus.CANCELED) + ",");
-
-			if (graph.getState() == JobStatus.FAILED) {
-				wrt.write("\"failednodes\": [");
-				boolean first = true;
-				for (ExecutionVertex vertex : graph.getAllExecutionVertices()) {
-					if (vertex.getExecutionState() == ExecutionState.FAILED) {
-						InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
-						Throwable failureCause = vertex.getFailureCause();
-						if (location != null || failureCause != null) {
-							if (first) {
-								first = false;
-							} else {
-								wrt.write(",");
-							}
-							wrt.write("{");
-							wrt.write("\"node\": \"" + (location == null ? "(none)" : location.getFQDNHostname()) + "\",");
-							wrt.write("\"message\": \"" + (failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + "\"");
-							wrt.write("}");
-						}
-					}
-				}
-				wrt.write("],");
-			}
-
-			// Serialize ManagementGraph to json
-			wrt.write("\"groupvertices\": [");
-			boolean first = true;
-			for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
-				//Write seperator between json objects
-				if (first) {
-					first = false;
-				} else {
-					wrt.write(",");
-				}
-
-				wrt.write(JsonFactory.toJson(groupVertex));
-
-			}
-			wrt.write("],");
-
-			// write user config
-			ExecutionConfig ec = graph.getExecutionConfig();
-			if(ec != null) {
-				wrt.write("\"executionConfig\": {");
-				wrt.write("\"Execution Mode\": \""+ec.getExecutionMode()+"\",");
-				wrt.write("\"Max. number of execution retries\": \""+ec.getNumberOfExecutionRetries()+"\",");
-				wrt.write("\"Job parallelism\": \""+ec.getParallelism()+"\",");
-				wrt.write("\"Object reuse mode\": \""+ec.isObjectReuseEnabled()+"\"");
-				ExecutionConfig.GlobalJobParameters uc = ec.getGlobalJobParameters();
-				if(uc != null) {
-					Map<String, String> ucVals = uc.toMap();
-					if (ucVals != null) {
-						String ucString = "{";
-						int i = 0;
-						for (Map.Entry<String, String> ucVal : ucVals.entrySet()) {
-							ucString += "\"" + ucVal.getKey() + "\":\"" + ucVal.getValue() + "\"";
-							if (++i < ucVals.size()) {
-								ucString += ",\n";
-							}
-						}
-						wrt.write(", \"userConfig\": " + ucString + "}");
-					}
-					else {
-						LOG.debug("GlobalJobParameters.toMap() did not return anything");
-					}
-				}
-				else {
-					LOG.debug("No GlobalJobParameters were set in the execution config");
-				}
-				wrt.write("},");
-			} else {
-				LOG.warn("Unable to retrieve execution config from execution graph");
-			}
-
-			// write accumulators
-			final Future<Object> response = jobmanager.ask(
-					new RequestAccumulatorResultsStringified(graph.getJobID()), timeout);
-
-			Object result;
-			try {
-				result = Await.result(response, timeout);
-			} catch (Exception ex) {
-				throw new IOException("Could not retrieve the accumulator results from the job manager.", ex);
-			}
-
-			if (result instanceof AccumulatorResultStringsFound) {
-				StringifiedAccumulatorResult[] accumulators = ((AccumulatorResultStringsFound) result).result();
-
-				wrt.write("\n\"accumulators\": [");
-				int i = 0;
-				for (StringifiedAccumulatorResult accumulator : accumulators) {
-					wrt.write("{ \"name\": \"" + accumulator.getName() + " (" + accumulator.getType() + ")\","
-							+ " \"value\": \"" + accumulator.getValue() + "\"}\n");
-					if (++i < accumulators.length) {
-						wrt.write(",");
-					}
-				}
-				wrt.write("],\n");
-			}
-			else if (result instanceof AccumulatorResultsNotFound) {
-				wrt.write("\n\"accumulators\": [],");
-			}
-			else if (result instanceof AccumulatorResultsErroneous) {
-				LOG.error("Could not obtain accumulators for job " + graph.getJobID(),
-						((AccumulatorResultsErroneous) result).cause());
-			}
-			else {
-				throw new RuntimeException("RequestAccumulatorResults requires a response of type " +
-						"AccumulatorResultStringsFound. Instead the response is of type " +
-						result.getClass() + ".");
-			}
-
-			wrt.write("\"groupverticetimes\": {");
-			first = true;
-
-			for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
-				if (first) {
-					first = false;
-				} else {
-					wrt.write(",");
-				}
-
-				// Calculate start and end time for groupvertex
-				long started = Long.MAX_VALUE;
-				long ended = 0;
-
-				// Take earliest running state and latest endstate of groupmembers
-				for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
-
-					long running = vertex.getStateTimestamp(ExecutionState.RUNNING);
-					if (running != 0 && running < started) {
-						started = running;
-					}
-
-					long finished = vertex.getStateTimestamp(ExecutionState.FINISHED);
-					long canceled = vertex.getStateTimestamp(ExecutionState.CANCELED);
-					long failed = vertex.getStateTimestamp(ExecutionState.FAILED);
-
-					if (finished != 0 && finished > ended) {
-						ended = finished;
-					}
-
-					if (canceled != 0 && canceled > ended) {
-						ended = canceled;
-					}
-
-					if (failed != 0 && failed > ended) {
-						ended = failed;
-					}
-
-				}
-
-				wrt.write("\"" + groupVertex.getJobVertexId() + "\": {");
-				wrt.write("\"groupvertexid\": \"" + groupVertex.getJobVertexId() + "\",");
-				wrt.write("\"groupvertexname\": \"" + groupVertex + "\",");
-				wrt.write("\"STARTED\": " + started + ",");
-				wrt.write("\"ENDED\": " + ended);
-				wrt.write("}");
-
-			}
-
-			wrt.write("}");
-			wrt.write("}");
-			wrt.write("]");
-		}
-		catch (Exception ex) { // Connection closed by client
-			LOG.error("Info server for JobManager: Failed to write json for archived jobs", ex);
-		}
-	}
-
-	/**
-	 * Writes all updates (events) for a given job since a given time
-	 *
-	 * @param wrt
-	 * @param jobId
-	 */
-	private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) {
-
-		try {
-			final Future<Object> responseArchivedJobs = jobmanager.ask(
-					JobManagerMessages.getRequestRunningJobs(),
-					timeout);
-
-			Object resultArchivedJobs = null;
-
-			try{
-				resultArchivedJobs = Await.result(responseArchivedJobs, timeout);
-			} catch (Exception ex) {
-				throw new IOException("Could not retrieve archived jobs from the job manager.", ex);
-			}
-
-			if(!(resultArchivedJobs instanceof RunningJobs)){
-				throw new RuntimeException("RequestArchivedJobs requires a response of type " +
-						"RunningJobs. Instead the response is of type " +
-						resultArchivedJobs.getClass() + ".");
-			} else {
-				final Iterable<ExecutionGraph> graphs = ((RunningJobs)resultArchivedJobs).
-						asJavaIterable();
-
-				//Serialize job to json
-				wrt.write("{");
-				wrt.write("\"jobid\": \"" + jobId + "\",");
-				wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\",");
-				wrt.write("\"recentjobs\": [");
-
-				boolean first = true;
-
-				for(ExecutionGraph g : graphs){
-					if (first) {
-						first = false;
-					} else {
-						wrt.write(",");
-					}
-
-					wrt.write("\"" + g.getJobID() + "\"");
-				}
-
-				wrt.write("],");
-
-				final Future<Object> responseJob = jobmanager.ask(new RequestJob(jobId), timeout);
-
-				Object resultJob = null;
-
-				try{
-					resultJob = Await.result(responseJob, timeout);
-				} catch (Exception ex){
-					throw new IOException("Could not retrieve the job with jobID " + jobId +
-							"from the job manager.", ex);
-				}
-
-				if(!(resultJob instanceof JobResponse)) {
-					throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
-							"Instead the response is of type " + resultJob.getClass() + ".");
-				} else {
-					final JobResponse response = (JobResponse) resultJob;
-
-					if(response instanceof JobFound){
-						ExecutionGraph graph = ((JobFound)response).executionGraph();
-
-						wrt.write("\"vertexevents\": [");
-
-						first = true;
-						for (ExecutionVertex ev : graph.getAllExecutionVertices()) {
-							if (first) {
-								first = false;
-							} else {
-								wrt.write(",");
-							}
-
-							wrt.write("{");
-							wrt.write("\"vertexid\": \"" + ev.getCurrentExecutionAttempt().getAttemptId()
-									+ "\",");
-							wrt.write("\"newstate\": \"" + ev.getExecutionState() + "\",");
-							wrt.write("\"timestamp\": \"" + ev.getStateTimestamp(ev.getExecutionState())
-									+ "\"");
-							wrt.write("}");
-						}
-
-						wrt.write("],");
-
-						wrt.write("\"jobevents\": [");
-
-						wrt.write("{");
-						wrt.write("\"newstate\": \"" + graph.getState() + "\",");
-						wrt.write("\"timestamp\": \"" + graph.getStatusTimestamp(graph.getState()) + "\"");
-						wrt.write("}");
-
-						wrt.write("]");
-
-						wrt.write("}");
-					} else {
-						wrt.write("\"vertexevents\": [],");
-						wrt.write("\"jobevents\": [");
-						wrt.write("{");
-						wrt.write("\"newstate\": \"" + JobStatus.FINISHED + "\",");
-						wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\"");
-						wrt.write("}");
-						wrt.write("]");
-						wrt.write("}");
-						LOG.warn("WriteJsonUpdatesForJob: Could not find job with job ID " + jobId);
-					}
-				}
-			}
-
-		} catch (Exception exception) { // Connection closed by client
-			LOG.info("Info server for jobmanager: Failed to write json updates for job {}, " +
-					"because {}.", jobId, StringUtils.stringifyException(exception));
-		}
-
-	}
-
-	/**
-	 * Writes info about one particular archived JobVertex in a job, including all member execution vertices, their times and statuses.
-	 */
-	private void writeJsonForArchivedJobGroupvertex(PrintWriter wrt, ExecutionGraph graph,
-													JobVertexID vertexId) {
-		ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId);
-
-		// Serialize ManagementGraph to json
-		wrt.write("{\"groupvertex\": " + JsonFactory.toJson(jobVertex) + ",");
-
-		wrt.write("\"verticetimes\": {");
-		boolean first = true;
-		for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) {
-
-			for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
-
-				Execution exec = vertex.getCurrentExecutionAttempt();
-
-				if(first) {
-					first = false;
-				} else {
-					wrt.write(","); }
-
-				wrt.write("\""+exec.getAttemptId() +"\": {");
-				wrt.write("\"vertexid\": \"" + exec.getAttemptId() + "\",");
-				wrt.write("\"vertexname\": \"" + vertex + "\",");
-				wrt.write("\"CREATED\": "+ vertex.getStateTimestamp(ExecutionState.CREATED) + ",");
-				wrt.write("\"SCHEDULED\": "+ vertex.getStateTimestamp(ExecutionState.SCHEDULED) + ",");
-				wrt.write("\"DEPLOYING\": "+ vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ",");
-				wrt.write("\"RUNNING\": "+ vertex.getStateTimestamp(ExecutionState.RUNNING) + ",");
-				wrt.write("\"FINISHED\": "+ vertex.getStateTimestamp(ExecutionState.FINISHED) + ",");
-				wrt.write("\"CANCELING\": "+ vertex.getStateTimestamp(ExecutionState.CANCELING) + ",");
-				wrt.write("\"CANCELED\": "+ vertex.getStateTimestamp(ExecutionState.CANCELED) + ",");
-				wrt.write("\"FAILED\": "+ vertex.getStateTimestamp(ExecutionState.FAILED) + "");
-				wrt.write("}");
-			}
-
-		}
-		wrt.write("}}");
-	}
-
-	/**
-	 * Writes the version and the revision of Flink.
-	 *
-	 * @param wrt
-	 */
-	private void writeJsonForVersion(PrintWriter wrt) {
-		wrt.write("{");
-		wrt.write("\"version\": \"" + EnvironmentInformation.getVersion() + "\",");
-		wrt.write("\"revision\": \"" + EnvironmentInformation.getRevisionInformation().commitId + "\"");
-		wrt.write("}");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
deleted file mode 100644
index 89e55d0..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.web;
-
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.IntermediateResult;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.util.StringUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class JsonFactory {
-
-	public static String toJson(ExecutionVertex vertex) {
-		StringBuilder json = new StringBuilder("");
-		json.append("{");
-		json.append("\"vertexid\": \"" + vertex.getCurrentExecutionAttempt().getAttemptId() + "\",");
-		json.append("\"vertexname\": \"" + StringUtils.escapeHtml(vertex.getSimpleName()) + "\",");
-		json.append("\"vertexstatus\": \"" + vertex.getExecutionState() + "\",");
-		
-		InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
-		String instanceName = location == null ? "(null)" : location.getFQDNHostname();
-		
-		json.append("\"vertexinstancename\": \"" + instanceName + "\"");
-		json.append("}");
-		return json.toString();
-	}
-	
-	public static String toJson(ExecutionJobVertex jobVertex) {
-		StringBuilder json = new StringBuilder("");
-		
-		json.append("{");
-		json.append("\"groupvertexid\": \"" + jobVertex.getJobVertexId() + "\",");
-		json.append("\"groupvertexname\": \"" + StringUtils.escapeHtml(jobVertex.getJobVertex().getName()) + "\",");
-		json.append("\"numberofgroupmembers\": " + jobVertex.getParallelism() + ",");
-		json.append("\"groupmembers\": [");
-		
-		// Count state status of group members
-		Map<ExecutionState, Integer> stateCounts = new HashMap<ExecutionState, Integer>();
-		
-		// initialize with 0
-		for (ExecutionState state : ExecutionState.values()) {
-			stateCounts.put(state, Integer.valueOf(0));
-		}
-		
-		ExecutionVertex[] vertices = jobVertex.getTaskVertices();
-		
-		for (int j = 0; j < vertices.length; j++) {
-			ExecutionVertex vertex = vertices[j];
-			
-			json.append(toJson(vertex));
-			
-			// print delimiter
-			if (j != vertices.length - 1) {
-				json.append(",");
-			}
-			
-			// Increment state status count
-			int count =  stateCounts.get(vertex.getExecutionState()) + 1;
-			stateCounts.put(vertex.getExecutionState(), count);
-		}
-		
-		json.append("],");
-		json.append("\"backwardEdges\": [");
-		
-		List<IntermediateResult> inputs = jobVertex.getInputs();
-		
-		for (int inputNumber = 0; inputNumber < inputs.size(); inputNumber++) {
-			ExecutionJobVertex input = inputs.get(inputNumber).getProducer();
-			
-			json.append("{");
-			json.append("\"groupvertexid\": \"" + input.getJobVertexId() + "\",");
-			json.append("\"groupvertexname\": \"" +  StringUtils.escapeHtml(jobVertex.getJobVertex().getName()) + "\"");
-			json.append("}");
-			
-			// print delimiter
-			if(inputNumber != inputs.size() - 1) {
-				json.append(",");
-			}
-		}
-		json.append("]");
-		
-		// list number of members for each status
-		for (Map.Entry<ExecutionState, Integer> stateCount : stateCounts.entrySet()) {
-			json.append(",\""+stateCount.getKey()+"\": " + stateCount.getValue());
-		}
-		
-		json.append("}");
-		
-		return json.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
deleted file mode 100644
index ecffdfd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.web;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.util.StringUtils;
-
-public class LogfileInfoServlet extends HttpServlet {
-
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * The log for this class.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(LogfileInfoServlet.class);
-
-	private File[] logDirs;
-
-
-	public LogfileInfoServlet(File[] logDirs) {
-		if(logDirs == null){
-			throw new NullPointerException("The given log files are null.");
-		}
-		this.logDirs = logDirs;
-	}
-
-	@Override
-	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
-
-		try {
-			if("stdout".equals(req.getParameter("get"))) {
-				// Find current stdout file
-				sendFile(".*jobmanager-[^\\.]*\\.out", resp);
-			}
-			else {
-				// Find current logfile
-				sendFile(".*jobmanager-[^\\.]*\\.log", resp);
-			}
-		} catch (Throwable t) {
-			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-			resp.getWriter().print("Error opening log files':"+t.getMessage());
-			if (LOG.isWarnEnabled()) {
-				LOG.warn(StringUtils.stringifyException(t));
-			}
-		}
-	}
-
-	private void sendFile(String fileNamePattern, HttpServletResponse resp) throws IOException {
-		for(File logDir: logDirs) {
-			if(logDir == null) {
-				continue;
-			}
-			File[] files = logDir.listFiles();
-			if(files == null) {
-				resp.setStatus(HttpServletResponse.SC_OK);
-				resp.setContentType("text/plain");
-				resp.getOutputStream().write(("The specified log directory '"+logDir+"' is empty").getBytes());
-			} else {
-				for (File f : files) {
-					// contains "jobmanager" ".log" and no number in the end ->needs improvement
-					if (f.getName().matches(fileNamePattern)) {
-						resp.setStatus(HttpServletResponse.SC_OK);
-						resp.setContentType("text/plain");
-						writeFile(resp.getOutputStream(), f);
-					}
-				}
-			}
-		}
-	}
-	private static void writeFile(OutputStream out, File file) throws IOException {
-		byte[] buf = new byte[4 * 1024]; // 4K buffer
-
-		FileInputStream  is = null;
-		try {
-			is = new FileInputStream(file);
-			out.write(("==== FILE: "+file.toString()+" ====\n").getBytes());
-			int bytesRead;
-			while ((bytesRead = is.read(buf)) != -1) {
-				out.write(buf, 0, bytesRead);
-			}
-		} finally {
-			if (is != null) {
-				is.close();
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java
deleted file mode 100644
index 6de1434..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.web;
-
-import java.io.IOException;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Servlet that displays the Configruation in the webinterface.
- *
- */
-public class MenuServlet extends HttpServlet {
-
-	/**
-	 * Serial UID for serialization interoperability.
-	 */
-	private static final long serialVersionUID = 117543213991787547L;
-	
-	/**
-	 * The log for this class.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(MenuServlet.class);
-	
-	/**
-	 * Array of possible menu entries on the left
-	 */
-	private static final String[] entries =  {
-		"index", "history", "configuration", "taskmanagers"
-	};
-	
-	/**
-	 * The names of the menu entries shown in the browser
-	 */
-	private static final String[] names = {
-		"Dashboard", "History", "Configuration", "Task Managers"
-	};
-	
-	/**
-	 * The classes of the icons shown next to the names in the browser
-	 */
-	private static final String[] classes = {
-		"fa fa-dashboard", "fa fa-bar-chart-o", "fa fa-keyboard-o", "fa fa-building-o"
-	};
-	
-	public MenuServlet() {
-		if (names.length != entries.length || names.length != classes.length) {
-			LOG.error("The Arrays 'entries', 'classes' and 'names' differ in thier length. This is not allowed!");
-		}
-	}
-	
-	@Override
-	protected void doGet(HttpServletRequest req, HttpServletResponse resp)
-			throws ServletException, IOException {
-		
-		resp.setStatus(HttpServletResponse.SC_OK);
-		resp.setContentType("application/json");
-		
-		if ("index".equals(req.getParameter("get"))) {
-			writeMenu("index", resp);
-		} else if ("analyze".equals(req.getParameter("get"))) {
-			writeMenu("analyze", resp);
-		} else if ("history".equals(req.getParameter("get"))) {
-			writeMenu("history", resp);
-		} else if ("configuration".equals(req.getParameter("get"))) {
-			writeMenu("configuration", resp);
-		} else if ("taskmanagers".equals(req.getParameter("get"))) {
-			writeMenu("taskmanagers", resp);
-		}
-
-	}
-	
-	private void writeMenu(String me, HttpServletResponse resp) throws IOException {
-		
-		String r = "";
-		
-		for (int i = 0; i < entries.length; i++) {
-			if (entries[i].equals(me)) {
-				r += writeLine(3, "<li class='active'><a href='"+ entries[i] +".html'><i class='"+ classes[i] +"'></i> "+ names[i] +"</a></li>");
-			} else {
-				r += writeLine(3, "<li><a href='"+ entries[i] +".html'><i class='"+ classes[i] +"'></i> "+ names[i] +"</a></li>");
-			}
-		}
-		
-		resp.getWriter().write(r);
-	}
-	
-	private String writeLine(int tab, String line) {
-		String s = "";
-		for (int i = 0; i < tab; i++) {
-			s += "\t";
-		}
-		s+= " " + line + " \n";
-		return s;
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
deleted file mode 100644
index 1f2bfe0..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.web;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.List;
-import java.util.Set;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.Instance;
-
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.RegisteredTaskManagers;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestStackTrace;
-import org.apache.flink.runtime.messages.TaskManagerMessages.StackTrace;
-import org.apache.flink.util.StringUtils;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * A Servlet that displays the Configuration in the web interface.
- */
-public class SetupInfoServlet extends HttpServlet {
-
-	/** Serial UID for serialization interoperability. */
-	private static final long serialVersionUID = 3704963598772630435L;
-
-	/** The log for this class. */
-	private static final Logger LOG = LoggerFactory.getLogger(SetupInfoServlet.class);
-
-
-	final private Configuration configuration;
-	final private ActorGateway jobmanager;
-	final private FiniteDuration timeout;
-
-
-	public SetupInfoServlet(Configuration conf, ActorGateway jobManager, FiniteDuration timeout) {
-		configuration = conf;
-		this.jobmanager = jobManager;
-		this.timeout = timeout;
-	}
-
-	@Override
-	protected void doGet(HttpServletRequest req, HttpServletResponse resp)
-			throws ServletException, IOException {
-		resp.setStatus(HttpServletResponse.SC_OK);
-		resp.setContentType("application/json");
-
-		if ("globalC".equals(req.getParameter("get"))) {
-			writeGlobalConfiguration(resp);
-		} else if ("taskmanagers".equals(req.getParameter("get"))) {
-			writeTaskmanagers(resp);
-		} else if ("stackTrace".equals(req.getParameter("get"))) {
-			String instanceId = req.getParameter("instanceID");
-			writeStackTraceOfTaskManager(instanceId, resp);
-		}
-	}
-
-	private void writeGlobalConfiguration(HttpServletResponse resp) throws IOException {
-		Set<String> keys = configuration.keySet();
-		List<String> list = new ArrayList<String>(keys);
-		Collections.sort(list);
-
-		JSONObject obj = new JSONObject();
-		for (String k : list) {
-			try {
-
-				obj.put(k, configuration.getString(k, ""));
-			} catch (JSONException e) {
-				LOG.warn("Json object creation failed", e);
-			}
-		}
-
-		PrintWriter w = resp.getWriter();
-		w.write(obj.toString());
-	}
-
-	private void writeTaskmanagers(HttpServletResponse resp) throws IOException {
-
-		final Future<Object> response = jobmanager.ask(
-				JobManagerMessages.getRequestRegisteredTaskManagers(),
-				timeout);
-
-		Object obj = null;
-
-		try{
-			obj = Await.result(response, timeout);
-		} catch (Exception ex) {
-			throw new IOException("Could not retrieve all registered task managers from the " +
-					"job manager.", ex);
-		}
-
-		if(!(obj instanceof RegisteredTaskManagers)){
-			throw new RuntimeException("RequestRegisteredTaskManagers should return a response of " +
-					"type RegisteredTaskManagers. Instead the respone is of type " +
-					obj.getClass() + ".");
-		} else {
-
-			final List<Instance> instances = new ArrayList<Instance>(
-					((RegisteredTaskManagers) obj).asJavaCollection());
-
-			Collections.sort(instances, INSTANCE_SORTER);
-
-			JSONObject jsonObj = new JSONObject();
-			JSONArray array = new JSONArray();
-			for (Instance instance : instances) {
-				JSONObject objInner = new JSONObject();
-
-				long time = new Date().getTime() - instance.getLastHeartBeat();
-
-				try {
-					objInner.put("path", instance.getActorGateway().path());
-					objInner.put("dataPort", instance.getInstanceConnectionInfo().dataPort());
-					objInner.put("timeSinceLastHeartbeat", time / 1000);
-					objInner.put("slotsNumber", instance.getTotalNumberOfSlots());
-					objInner.put("freeSlots", instance.getNumberOfAvailableSlots());
-					objInner.put("cpuCores", instance.getResources().getNumberOfCPUCores());
-					objInner.put("physicalMemory", instance.getResources().getSizeOfPhysicalMemory() >>> 20);
-					objInner.put("freeMemory", instance.getResources().getSizeOfJvmHeap() >>> 20);
-					objInner.put("managedMemory", instance.getResources().getSizeOfManagedMemory() >>> 20);
-					objInner.put("instanceID", instance.getId());
-					byte[] report = instance.getLastMetricsReport();
-					if(report != null) {
-						objInner.put("metrics", new JSONObject(new String(report, "utf-8")));
-					}
-					array.put(objInner);
-				} catch (JSONException e) {
-					LOG.warn("Json object creation failed", e);
-				}
-
-			}
-			try {
-				jsonObj.put("taskmanagers", array);
-			} catch (JSONException e) {
-				LOG.warn("Json object creation failed", e);
-			}
-
-			PrintWriter w = resp.getWriter();
-			w.write(jsonObj.toString());
-		}
-	}
-
-
-	private void writeStackTraceOfTaskManager(String instanceIdStr, HttpServletResponse resp) throws IOException {
-		InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(instanceIdStr));
-		StackTrace message = null;
-		Throwable exception = null;
-
-		final Future<Object> response = jobmanager.ask(
-				new RequestStackTrace(instanceID),
-				timeout);
-
-		try {
-			message = (StackTrace) Await.result(response, timeout);
-		} catch (Exception ex) {
-			exception = ex;
-		}
-
-		JSONObject obj = new JSONObject();
-		try {
-			if (message != null) {
-				obj.put("stackTrace", message.stackTrace());
-			} else if (exception != null) {
-				obj.put("errorMessage", exception.getMessage());
-			}
-		} catch (JSONException e) {
-			LOG.warn("Json object creation failed", e);
-		}
-
-		PrintWriter writer = resp.getWriter();
-		writer.write(obj.toString());
-	}
-	// --------------------------------------------------------------------------------------------
-
-	private static final Comparator<Instance> INSTANCE_SORTER = new Comparator<Instance>() {
-		@Override
-		public int compare(Instance o1, Instance o2) {
-		return o1.getInstanceConnectionInfo().compareTo(o2.getInstanceConnectionInfo());
-		}
-	};
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
deleted file mode 100644
index 21a1f51..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.web;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URL;
-import java.util.UUID;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import com.google.common.base.Preconditions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.webmonitor.WebMonitor;
-import org.eclipse.jetty.server.Connector;
-import org.eclipse.jetty.server.Handler;
-import org.eclipse.jetty.server.handler.HandlerCollection;
-import org.eclipse.jetty.server.handler.ResourceHandler;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * This class sets up a web-server that contains a web frontend to display information about running jobs.
- * It instantiates and configures an embedded jetty server.
- */
-public class WebInfoServer implements WebMonitor, LeaderRetrievalListener {
-
-	/** Web root dir in the jar */
-	private static final String WEB_ROOT_DIR = "web-docs-infoserver";
-
-	/** The log for this class. */
-	private static final Logger LOG = LoggerFactory.getLogger(WebInfoServer.class);
-
-	/** The jetty server serving all requests. */
-	private final Server server;
-
-	/** Retrieval service for the current leading JobManager */
-	private final LeaderRetrievalService leaderRetrievalService;
-
-	/** ActorSystem used to retrieve the ActorRefs */
-	private final ActorSystem actorSystem;
-
-	/** Collection for the registered jetty handlers */
-	private final HandlerCollection handlers;
-
-	/** Associated configuration */
-	private final Configuration config;
-
-	/** Timeout for the servlets */
-	private final FiniteDuration timeout;
-
-	/** Actor look up timeout */
-	private final FiniteDuration lookupTimeout;
-
-	/** Default jetty handler responsible for serving static content */
-	private final ResourceHandler resourceHandler;
-
-	/** File paths to log dirs */
-	final File[] logDirFiles;
-
-	/** The assigned port where jetty is running. */
-	private int assignedPort = -1;
-
-	/**
-	 * Creates a new web info server. The server runs the servlets that implement the logic
-	 * to list all present information concerning the job manager
-	 *
-	 * @param config The Flink configuration.
-	 * @param leaderRetrievalService Retrieval service to obtain the current leader
-	 *
-	 * @throws IOException
-	 *         Thrown, if the server setup failed for an I/O related reason.
-	 */
-	public WebInfoServer(
-			Configuration config,
-			LeaderRetrievalService leaderRetrievalService,
-			ActorSystem actorSystem)
-		throws IOException {
-		if (config == null) {
-			throw new IllegalArgumentException("No Configuration has been passed to the web server");
-		}
-
-		this.config = config;
-
-		this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService);
-
-		// if port == 0, jetty will assign an available port.
-		int port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
-				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
-		if (port < 0) {
-			throw new IllegalArgumentException("Invalid port for the webserver: " + port);
-		}
-
-		timeout = AkkaUtils.getTimeout(config);
-		lookupTimeout = AkkaUtils.getLookupTimeout(config);
-
-		this.actorSystem = actorSystem;
-
-		// get base path of Flink installation
-		final String basePath = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, "");
-		final String[] logDirPaths = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY,
-				basePath+"/log").split(","); // YARN allows to specify multiple log directories
-
-		URL webRootDir = this.getClass().getClassLoader().getResource(WEB_ROOT_DIR);
-
-		if(webRootDir == null) {
-			throw new FileNotFoundException("Cannot start JobManager web info server. The " +
-					"resource " + WEB_ROOT_DIR + " is not included in the jar.");
-		}
-
-		logDirFiles = new File[logDirPaths.length];
-		int i = 0;
-		for(String path : logDirPaths) {
-			logDirFiles[i++] = new File(path);
-		}
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Setting up web info server, using web-root directory " +
-					webRootDir.toExternalForm()	+ ".");
-
-		}
-
-		server = new Server(port);
-
-		// ----- the handler serving all the static files -----
-		resourceHandler = new ResourceHandler();
-		resourceHandler.setDirectoriesListed(false);
-		resourceHandler.setResourceBase(webRootDir.toExternalForm());
-
-		// ----- add the handlers to the list handler -----
-
-		// make the HandlerCollection mutable so that we can update it later on
-		handlers = new HandlerCollection(true);
-		handlers.addHandler(resourceHandler);
-		server.setHandler(handlers);
-	}
-
-	/**
-	 * Starts the web frontend server.
-	 *
-	 * @throws Exception
-	 *         Thrown, if the start fails.
-	 */
-	public void start() throws Exception {
-		server.start();
-		
-		final Connector[] connectors = server.getConnectors();
-		if (connectors != null && connectors.length > 0) {
-			Connector conn = connectors[0];
-
-			// we have to use getLocalPort() instead of getPort() http://stackoverflow.com/questions/8884865/how-to-discover-jetty-7-running-port
-			this.assignedPort = conn.getLocalPort(); 
-			String host = conn.getHost();
-			if (host == null) { // as per method documentation
-				host = "0.0.0.0";
-			}
-			LOG.info("Started web info server for JobManager on {}:{}", host, assignedPort);
-		}
-		else {
-			LOG.warn("Unable to determine local endpoint of web frontend server");
-		}
-
-		leaderRetrievalService.start(this);
-	}
-
-	/**
-	 * Stop the webserver
-	 */
-	public void stop() throws Exception {
-		leaderRetrievalService.stop();
-		server.stop();
-		assignedPort = -1;
-	}
-
-	public int getServerPort() {
-		return this.assignedPort;
-	}
-
-	@Override
-	public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
-
-		if(leaderAddress != null && !leaderAddress.equals("")) {
-			try {
-				ActorRef jobManager = AkkaUtils.getActorRef(
-					leaderAddress,
-					actorSystem,
-					lookupTimeout);
-				ActorGateway jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID);
-
-				Future<Object> archiveFuture = jobManagerGateway.ask(
-					JobManagerMessages.getRequestArchive(),
-					timeout);
-
-				ActorRef archive = ((JobManagerMessages.ResponseArchive) Await.result(
-					archiveFuture,
-					timeout)).actor();
-
-				ActorGateway archiveGateway = new AkkaActorGateway(archive, leaderSessionID);
-
-				updateHandler(jobManagerGateway, archiveGateway);
-			} catch (Exception e) {
-				handleError(e);
-			}
-		}
-	}
-
-	@Override
-	public void handleError(Exception exception) {
-		LOG.error("Received error from LeaderRetrievalService.", exception);
-
-		try{
-			// stop the whole web server
-			stop();
-		} catch (Exception e) {
-			LOG.error("Error while stopping the web server due to a LeaderRetrievalService error.", e);
-		}
-	}
-
-	/**
-	 * Updates the Flink handlers with the current leading JobManager and archive
-	 *
-	 * @param jobManager ActorGateway to the current JobManager leader
-	 * @param archive ActorGateway to the current archive of the leading JobManager
-	 * @throws Exception
-	 */
-	private void updateHandler(ActorGateway jobManager, ActorGateway archive) throws Exception {
-		// ----- the handlers for the servlets -----
-		ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
-		servletContext.setContextPath("/");
-		servletContext.addServlet(
-				new ServletHolder(
-						new JobManagerInfoServlet(
-								jobManager,
-								archive,
-								timeout)),
-				"/jobsInfo");
-		servletContext.addServlet(
-				new ServletHolder(
-						new LogfileInfoServlet(
-								logDirFiles)),
-				"/logInfo");
-		servletContext.addServlet(
-				new ServletHolder(
-						new SetupInfoServlet(
-								config,
-								jobManager,
-								timeout)),
-				"/setupInfo");
-		servletContext.addServlet(
-				new ServletHolder(
-						new MenuServlet()),
-				"/menu");
-
-		// replace old handlers with new ones
-		handlers.setHandlers(new Handler[]{resourceHandler, servletContext});
-
-		// start new handler
-		servletContext.start();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 31d9aae..4fca270 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -18,12 +18,25 @@
 
 package org.apache.flink.runtime.webmonitor;
 
+import akka.actor.ActorSystem;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Utilities for the web runtime monitor. This class contains for example methods to build
@@ -32,55 +45,85 @@ import org.apache.flink.runtime.messages.webmonitor.JobDetails;
  */
 public final class WebMonitorUtils {
 
+	private static final Logger LOG = LoggerFactory.getLogger(WebMonitorUtils.class);
+
+	/**
+	 * Starts the web runtime monitor. Because the actual implementation of the runtime monitor is
+	 * in another project, we load the runtime monitor dynamically.
+	 * <p/>
+	 * Because failure to start the web runtime monitor is not considered fatal, this method does
+	 * not throw any exceptions, but only logs them.
+	 *
+	 * @param config                 The configuration for the runtime monitor.
+	 * @param leaderRetrievalService Leader retrieval service to get the leading JobManager
+	 */
+	public static WebMonitor startWebRuntimeMonitor(
+			Configuration config,
+			LeaderRetrievalService leaderRetrievalService,
+			ActorSystem actorSystem) {
+		// try to load and instantiate the class
+		try {
+			String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
+			Class clazz = Class.forName(classname).asSubclass(WebMonitor.class);
+			@SuppressWarnings("unchecked")
+			Constructor<WebMonitor> constructor = clazz.getConstructor(Configuration.class,
+					LeaderRetrievalService.class,
+					ActorSystem.class);
+			return constructor.newInstance(config, leaderRetrievalService, actorSystem);
+		} catch (ClassNotFoundException e) {
+			LOG.error("Could not load web runtime monitor. " +
+					"Probably reason: flink-runtime-web is not in the classpath");
+			LOG.debug("Caught exception", e);
+			return null;
+		} catch (InvocationTargetException e) {
+			LOG.error("WebServer could not be created", e.getTargetException());
+			return null;
+		} catch (Throwable t) {
+			LOG.error("Failed to instantiate web runtime monitor.", t);
+			return null;
+		}
+	}
+
+	public static Map<String, String> fromKeyValueJsonArray (JSONArray parsed) throws JSONException {
+		Map<String, String> hashMap = new HashMap<>();
+
+		for (int i = 0; i < parsed.length(); i++) {
+			JSONObject jsonObject = parsed.getJSONObject(i);
+			String key = jsonObject.getString("key");
+			String value = jsonObject.getString("value");
+			hashMap.put(key, value);
+		}
+
+		return hashMap;
+	}
+
 	public static JobDetails createDetailsForJob(ExecutionGraph job) {
 		JobStatus status = job.getState();
-		
+
 		long started = job.getStatusTimestamp(JobStatus.CREATED);
 		long finished = status.isTerminalState() ? job.getStatusTimestamp(status) : -1L;
-		
+
 		int[] countsPerStatus = new int[ExecutionState.values().length];
 		long lastChanged = 0;
 		int numTotalTasks = 0;
-		
+
 		for (ExecutionJobVertex ejv : job.getVerticesTopologically()) {
 			ExecutionVertex[] vertices = ejv.getTaskVertices();
 			numTotalTasks += vertices.length;
-			
+
 			for (ExecutionVertex vertex : vertices) {
 				ExecutionState state = vertex.getExecutionState();
 				countsPerStatus[state.ordinal()]++;
 				lastChanged = Math.max(lastChanged, vertex.getStateTimestamp(state));
 			}
 		}
-		
+
 		lastChanged = Math.max(lastChanged, finished);
-		
+
 		return new JobDetails(job.getJobID(), job.getJobName(),
-				started, finished, status, lastChanged,  
+				started, finished, status, lastChanged,
 				countsPerStatus, numTotalTasks);
 	}
-	
-	public static void aggregateExecutionStateTimestamps(long[] timestamps, long[] other) {
-		timestamps[CREATED_POS] = Math.min(timestamps[CREATED_POS], other[CREATED_POS]);
-		timestamps[SCHEDULED_POS] = Math.min(timestamps[SCHEDULED_POS], other[SCHEDULED_POS]);
-		timestamps[DEPLOYING_POS] = Math.min(timestamps[DEPLOYING_POS], other[DEPLOYING_POS]);
-		timestamps[RUNNING_POS] = Math.min(timestamps[RUNNING_POS], other[RUNNING_POS]);
-		timestamps[FINISHED_POS] = Math.max(timestamps[FINISHED_POS], other[FINISHED_POS]);
-		timestamps[CANCELING_POS] = Math.min(timestamps[CANCELING_POS], other[CANCELING_POS]);
-		timestamps[CANCELED_POS] = Math.max(timestamps[CANCELED_POS], other[CANCELED_POS]);
-		timestamps[FAILED_POS] = Math.min(timestamps[FAILED_POS], other[FAILED_POS]);
-	}
-	
-	// ------------------------------------------------------------------------
-
-	private static final int CREATED_POS = ExecutionState.CREATED.ordinal();
-	private static final int SCHEDULED_POS = ExecutionState.SCHEDULED.ordinal();
-	private static final int DEPLOYING_POS = ExecutionState.DEPLOYING.ordinal();
-	private static final int RUNNING_POS = ExecutionState.RUNNING.ordinal();
-	private static final int FINISHED_POS = ExecutionState.FINISHED.ordinal();
-	private static final int CANCELING_POS = ExecutionState.CANCELING.ordinal();
-	private static final int CANCELED_POS = ExecutionState.CANCELED.ordinal();
-	private static final int FAILED_POS = ExecutionState.FAILED.ordinal();
 
 	/**
 	 * Private constructor to prevent instantiation.
@@ -88,4 +131,6 @@ public final class WebMonitorUtils {
 	private WebMonitorUtils() {
 		throw new RuntimeException();
 	}
-}
+
+
+}
\ No newline at end of file