You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/17 21:03:41 UTC
[21/21] flink git commit: [FLINK-2844] [web frontend] Remove old web
interface
[FLINK-2844] [web frontend] Remove old web interface
- make new web one the default
- adapt tests
- make web directory a resource to be included in the fat jar
- serve static files of web interface dynamic through the class loader
- run on YARN
- remove Jetty dependencies from poms
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df448625
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df448625
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df448625
Branch: refs/heads/master
Commit: df448625817817ecdc897f2e14b03bfff426467d
Parents: a8eeb3b
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Oct 9 17:17:52 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Oct 17 18:45:02 2015 +0200
----------------------------------------------------------------------
flink-clients/pom.xml | 8 +-
.../flink/configuration/ConfigConstants.java | 14 +-
flink-dist/src/main/assemblies/bin.xml | 7 -
flink-runtime-web/pom.xml | 12 +
.../runtime/webmonitor/WebMonitorConfig.java | 25 +-
.../runtime/webmonitor/WebRuntimeMonitor.java | 114 +-
.../files/StaticFileServerHandler.java | 18 +
flink-runtime/pom.xml | 12 -
.../jobmanager/web/JobManagerInfoServlet.java | 737 --
.../runtime/jobmanager/web/JsonFactory.java | 112 -
.../jobmanager/web/LogfileInfoServlet.java | 116 -
.../runtime/jobmanager/web/MenuServlet.java | 120 -
.../jobmanager/web/SetupInfoServlet.java | 216 -
.../runtime/jobmanager/web/WebInfoServer.java | 293 -
.../runtime/webmonitor/WebMonitorUtils.java | 105 +-
.../resources/web-docs-infoserver/analyze.html | 162 -
.../web-docs-infoserver/blank-page.html | 115 -
.../web-docs-infoserver/configuration.html | 121 -
.../web-docs-infoserver/css/bootstrap.css | 5831 -----------
.../web-docs-infoserver/css/bootstrap.css.map | 1 -
.../web-docs-infoserver/css/bootstrap.min.css | 7 -
.../web-docs-infoserver/css/nephelefrontend.css | 21 -
.../web-docs-infoserver/css/rickshaw.min.css | 1 -
.../web-docs-infoserver/css/sb-admin.css | 164 -
.../web-docs-infoserver/css/timeline.css | 204 -
.../font-awesome/css/font-awesome.css | 1338 ---
.../font-awesome/css/font-awesome.min.css | 4 -
.../font-awesome/fonts/FontAwesome.otf | Bin 62856 -> 0 bytes
.../font-awesome/fonts/fontawesome-webfont.eot | Bin 38205 -> 0 bytes
.../font-awesome/fonts/fontawesome-webfont.svg | 414 -
.../font-awesome/fonts/fontawesome-webfont.ttf | Bin 80652 -> 0 bytes
.../font-awesome/fonts/fontawesome-webfont.woff | Bin 44432 -> 0 bytes
.../font-awesome/less/bordered-pulled.less | 16 -
.../font-awesome/less/core.less | 12 -
.../font-awesome/less/fixed-width.less | 6 -
.../font-awesome/less/font-awesome.less | 17 -
.../font-awesome/less/icons.less | 412 -
.../font-awesome/less/larger.less | 13 -
.../font-awesome/less/list.less | 19 -
.../font-awesome/less/mixins.less | 20 -
.../font-awesome/less/path.less | 14 -
.../font-awesome/less/rotated-flipped.less | 9 -
.../font-awesome/less/spinning.less | 30 -
.../font-awesome/less/stacked.less | 20 -
.../font-awesome/less/variables.less | 381 -
.../font-awesome/scss/_bordered-pulled.scss | 16 -
.../font-awesome/scss/_core.scss | 12 -
.../font-awesome/scss/_fixed-width.scss | 6 -
.../font-awesome/scss/_icons.scss | 412 -
.../font-awesome/scss/_larger.scss | 13 -
.../font-awesome/scss/_list.scss | 19 -
.../font-awesome/scss/_mixins.scss | 20 -
.../font-awesome/scss/_path.scss | 14 -
.../font-awesome/scss/_rotated-flipped.scss | 9 -
.../font-awesome/scss/_spinning.scss | 30 -
.../font-awesome/scss/_stacked.scss | 20 -
.../font-awesome/scss/_variables.scss | 381 -
.../font-awesome/scss/font-awesome.scss | 17 -
.../resources/web-docs-infoserver/history.html | 126 -
.../web-docs-infoserver/img/flink-logo.png | Bin 6096 -> 0 bytes
.../resources/web-docs-infoserver/index.html | 223 -
.../web-docs-infoserver/js/analyzer.js | 329 -
.../web-docs-infoserver/js/bootstrap.js | 1951 ----
.../web-docs-infoserver/js/configuration.js | 40 -
.../web-docs-infoserver/js/d3.layout.min.js | 1 -
.../resources/web-docs-infoserver/js/d3.min.js | 2 -
.../resources/web-docs-infoserver/js/helpers.js | 40 -
.../web-docs-infoserver/js/jcanvas.min.js | 61 -
.../js/jobmanagerFrontend.js | 470 -
.../web-docs-infoserver/js/jquery-2.1.0.js | 9111 ------------------
.../web-docs-infoserver/js/rickshaw.min.js | 3 -
.../web-docs-infoserver/js/taskmanager.js | 464 -
.../web-docs-infoserver/js/timeline.js | 6444 -------------
.../web-docs-infoserver/taskmanagers.html | 182 -
.../flink/runtime/jobmanager/JobManager.scala | 71 +-
.../runtime/minicluster/FlinkMiniCluster.scala | 24 +-
flink-tests/pom.xml | 9 +-
.../flink/test/web/WebFrontendITCase.java | 76 +-
.../flink/yarn/YARNSessionFIFOITCase.java | 31 +-
pom.xml | 30 -
80 files changed, 276 insertions(+), 31642 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 84264f9..928ba4f 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -73,23 +73,23 @@ under the License.
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
-
+
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
- <!-- version is derived from base module -->
+ <version>8.0.0.M1</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
- <!-- version is derived from base module -->
+ <version>8.0.0.M1</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
- <!-- version is derived from base module -->
+ <version>8.0.0.M1</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index b1ffdd8..5d6f1c7 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -293,23 +293,15 @@ public final class ConfigConstants {
public static final String JOB_MANAGER_WEB_PORT_KEY = "jobmanager.web.port";
/**
- * The option that specifies whether to use the new web frontend
- */
- public static final String JOB_MANAGER_NEW_WEB_FRONTEND_KEY = "jobmanager.new-web-frontend";
-
- /**
* The config parameter defining the number of archived jobs for the jobmanager
*/
public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = "jobmanager.web.history";
-
+
public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.logpath";
- /** The directory where the web server's static contents is stored */
- public static final String JOB_MANAGER_WEB_DOC_ROOT_KEY = "jobmanager.web.docroot";
-
-
+
// ------------------------------ Web Client ------------------------------
-
+
/**
* The config parameter defining port for the pact web-frontend server.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index 09102ef..e20e94a 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -123,13 +123,6 @@ under the License.
<fileMode>0644</fileMode>
</fileSet>
- <!-- copy the web documents -->
- <fileSet>
- <directory>../flink-runtime-web/web-dashboard/web</directory>
- <outputDirectory>resources/web-runtime-monitor</outputDirectory>
- <fileMode>0644</fileMode>
- </fileSet>
-
<!-- copy the tools -->
<fileSet>
<directory>src/main/flink-bin/tools</directory>
http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index afe71b4..ffb68bc 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -34,6 +34,18 @@ under the License.
<packaging>jar</packaging>
+ <build>
+ <resources>
+ <resource>
+ <!-- Only include the web folder from the web-dashboard directory -->
+ <directory>web-dashboard</directory>
+ <includes>
+ <include>web/**</include>
+ </includes>
+ </resource>
+ </resources>
+ </build>
+
<dependencies>
<!-- ===================================================
http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
index c8e64c9..5b537b7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
@@ -31,13 +31,10 @@ public class WebMonitorConfig {
/** The port for the runtime monitor web-frontend server. */
public static final String JOB_MANAGER_WEB_PORT_KEY = ConfigConstants.JOB_MANAGER_WEB_PORT_KEY;
- /** The directory where the web server's static contents is stored */
- public static final String JOB_MANAGER_WEB_DOC_ROOT_KEY = ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY;
-
/** The initial refresh interval for the web dashboard */
public static final String JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY = "jobmanager.web.refresh-interval";
-
-
+
+
// ------------------------------------------------------------------------
// Default values
// ------------------------------------------------------------------------
@@ -47,32 +44,28 @@ public class WebMonitorConfig {
/** Default refresh interval for the web dashboard (= 3000 msecs) */
public static final long DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL = 3000;
-
-
+
+
// ------------------------------------------------------------------------
// Config
// ------------------------------------------------------------------------
-
+
/** The configuration queried by this config object */
private final Configuration config;
-
+
public WebMonitorConfig(Configuration config) {
if (config == null) {
throw new NullPointerException();
}
this.config = config;
}
-
-
+
+
public int getWebFrontendPort() {
return config.getInteger(JOB_MANAGER_WEB_PORT_KEY, DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
}
-
- public String getWebRoot() {
- return config.getString(JOB_MANAGER_WEB_DOC_ROOT_KEY, null);
- }
-
+
public long getRefreshInterval() {
return config.getLong(JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY, DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 96da0c8..7c252dc 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -32,9 +32,8 @@ import io.netty.handler.codec.http.router.Handler;
import io.netty.handler.codec.http.router.Router;
import io.netty.handler.stream.ChunkedWriteHandler;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
@@ -66,7 +65,9 @@ import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -82,15 +83,12 @@ public class WebRuntimeMonitor implements WebMonitor {
/** By default, all requests to the JobManager have a timeout of 10 seconds */
public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
-
+
/** Logger for web frontend startup / shutdown messages */
private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);
-
- /** Teh default path under which the static contents is stored */
- private static final String STATIC_CONTENTS_PATH = "resources/web-runtime-monitor";
-
+
// ------------------------------------------------------------------------
-
+
/** Guarding concurrent modifications to the server channel pipeline during startup and shutdown */
private final Object startupShutdownLock = new Object();
@@ -98,48 +96,34 @@ public class WebRuntimeMonitor implements WebMonitor {
/** LeaderRetrievalListener which stores the currently leading JobManager and its archive */
private final JobManagerArchiveRetriever retriever;
-
+
private final Router router;
private final int configuredPort;
private ServerBootstrap bootstrap;
-
+
private Channel serverChannel;
-
+ private final File webRootDir;
+
+ private AtomicBoolean isShutdown = new AtomicBoolean();
+
+
public WebRuntimeMonitor(
Configuration config,
LeaderRetrievalService leaderRetrievalService,
ActorSystem actorSystem) throws IOException
{
this.leaderRetrievalService = checkNotNull(leaderRetrievalService);
-
+
final WebMonitorConfig cfg = new WebMonitorConfig(config);
-
- // figure out where our static contents is
- final String flinkRoot = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
- final String configuredWebRoot = cfg.getWebRoot();
-
- final File webRootDir;
- if (configuredWebRoot != null) {
- webRootDir = new File(configuredWebRoot);
- }
- else if (flinkRoot != null) {
- webRootDir = new File(flinkRoot, STATIC_CONTENTS_PATH);
- }
- else {
- throw new IllegalConfigurationException("The given configuration provides neither the web-document root ("
- + WebMonitorConfig.JOB_MANAGER_WEB_DOC_ROOT_KEY + "), not the Flink installation root ("
- + ConfigConstants.FLINK_BASE_DIR_PATH_KEY + ").");
- }
-
- // validate that the doc root is a valid directory
- if (!(webRootDir.exists() && webRootDir.isDirectory() && webRootDir.canRead())) {
- throw new IllegalConfigurationException("The path to the static contents (" +
- webRootDir.getAbsolutePath() + ") is not a readable directory.");
- }
-
+
+ // create an empty directory in temp for the web server
+ String fileName = String.format("flink-web-%s", UUID.randomUUID().toString());
+ webRootDir = new File(System.getProperty("java.io.tmpdir"), fileName);
+ LOG.info("Using directory {} for the web interface files", webRootDir);
+
// port configuration
this.configuredPort = cfg.getWebFrontendPort();
if (this.configuredPort < 0) {
@@ -150,7 +134,7 @@ public class WebRuntimeMonitor implements WebMonitor {
FiniteDuration lookupTimeout = AkkaUtils.getTimeout(config);
retriever = new JobManagerArchiveRetriever(this, actorSystem, lookupTimeout, timeout);
-
+
ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(retriever);
router = new Router()
@@ -200,13 +184,29 @@ public class WebRuntimeMonitor implements WebMonitor {
if (this.bootstrap != null) {
throw new IllegalStateException("The server has already been started");
}
-
+
+ // add shutdown hook for deleting the directory
+ try {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ shutdown();
+ }
+ });
+ } catch (IllegalStateException e) {
+ // race, JVM is in shutdown already, we can safely ignore this
+ LOG.debug("Unable to add shutdown hook, shutdown already in progress", e);
+ } catch(Throwable t) {
+ // these errors usually happen when the shutdown is already in progress
+ LOG.warn("Error while adding shutdown hook", t);
+ }
+
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
Handler handler = new Handler(router);
-
+
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(65536))
@@ -214,34 +214,34 @@ public class WebRuntimeMonitor implements WebMonitor {
.addLast(handler.name(), handler);
}
};
-
+
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
-
+
this.bootstrap = new ServerBootstrap();
this.bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
-
+
Channel ch = this.bootstrap.bind(configuredPort).sync().channel();
this.serverChannel = ch;
-
+
InetSocketAddress bindAddress = (InetSocketAddress) ch.localAddress();
String address = bindAddress.getAddress().getHostAddress();
int port = bindAddress.getPort();
-
+
LOG.info("Web frontend listening at " + address + ':' + port);
leaderRetrievalService.start(retriever);
}
}
-
+
@Override
public void stop() throws Exception {
synchronized (startupShutdownLock) {
leaderRetrievalService.stop();
-
+
if (this.serverChannel != null) {
this.serverChannel.close().awaitUninterruptibly();
this.serverChannel = null;
@@ -252,9 +252,11 @@ public class WebRuntimeMonitor implements WebMonitor {
}
this.bootstrap = null;
}
+
+ shutdown();
}
}
-
+
@Override
public int getServerPort() {
Channel server = this.serverChannel;
@@ -266,14 +268,26 @@ public class WebRuntimeMonitor implements WebMonitor {
LOG.error("Cannot access local server port", e);
}
}
-
+
return -1;
}
-
+
+ private void shutdown() {
+ if (!isShutdown.compareAndSet(false, true)) {
+ return;
+ }
+ try {
+ LOG.info("Removing web root dir {}", webRootDir);
+ FileUtils.deleteDirectory(webRootDir);
+ } catch (Throwable t) {
+ LOG.warn("Error while deleting web root dir {}", webRootDir, t);
+ }
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
-
+
private static RuntimeMonitorHandler handler(RequestHandler handler) {
return new RuntimeMonitorHandler(handler);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index e368ea9..b1497f9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -49,7 +49,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.InputStream;
import java.io.RandomAccessFile;
+import java.nio.file.Files;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
@@ -135,6 +137,22 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
// convert to absolute path
final File file = new File(rootPath, requestPath);
+
+ if(!file.exists()) {
+ // file does not exist. Try to load it with the classloader
+ ClassLoader cl = StaticFileServerHandler.class.getClassLoader();
+ try(InputStream resourceStream = cl.getResourceAsStream("web" + requestPath)) {
+ if (resourceStream == null) {
+ logger.debug("Unable to load requested file {} from classloader", requestPath);
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+ logger.debug("Loading missing file from classloader: {}", requestPath);
+ // ensure that directory to file exists.
+ file.getParentFile().mkdirs();
+ Files.copy(resourceStream, file.toPath());
+ }
+ }
if (!file.exists() || file.isHidden() || file.isDirectory() || !file.isFile()) {
sendError(ctx, NOT_FOUND);
http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 1802709..a831eba 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -74,18 +74,6 @@ under the License.
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
-
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-server</artifactId>
- <!-- version is derived from base module -->
- </dependency>
-
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-servlet</artifactId>
- <!-- version is derived from base module -->
- </dependency>
<dependency>
<groupId>com.amazonaws</groupId>
http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
deleted file mode 100644
index 0ecc941..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
+++ /dev/null
@@ -1,737 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.web;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.messages.ArchiveMessages.ArchivedJobs;
-import org.apache.flink.runtime.messages.ArchiveMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.JobResponse;
-import org.apache.flink.runtime.messages.JobManagerMessages.JobFound;
-import org.apache.flink.runtime.messages.accumulators.AccumulatorResultStringsFound;
-import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
-import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsNotFound;
-import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResultsStringified;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.StringUtils;
-import org.eclipse.jetty.io.EofException;
-
-import scala.Tuple3;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-public class JobManagerInfoServlet extends HttpServlet {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(JobManagerInfoServlet.class);
-
- /** Underlying JobManager */
- private final ActorGateway jobmanager;
- private final ActorGateway archive;
- private final FiniteDuration timeout;
-
-
- public JobManagerInfoServlet(ActorGateway jobmanager, ActorGateway archive, FiniteDuration timeout) {
- this.jobmanager = jobmanager;
- this.archive = archive;
- this.timeout = timeout;
- }
-
-
- @Override
- protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException,
- IOException {
-
- resp.setStatus(HttpServletResponse.SC_OK);
- resp.setContentType("application/json");
-
- Future<Object> response;
- Object result;
-
- try {
- if("archive".equals(req.getParameter("get"))) {
- response = archive.ask(ArchiveMessages.getRequestArchivedJobs(), timeout);
-
- result = Await.result(response, timeout);
-
- if(!(result instanceof ArchivedJobs)) {
- throw new RuntimeException("RequestArchiveJobs requires a response of type " +
- "ArchivedJobs. Instead the response is of type " + result.getClass() +
- ".");
- } else {
- final List<ExecutionGraph> archivedJobs = new ArrayList<ExecutionGraph>(
- ((ArchivedJobs) result).asJavaCollection());
-
- writeJsonForArchive(resp.getWriter(), archivedJobs);
- }
- }
- else if("jobcounts".equals(req.getParameter("get"))) {
- response = archive.ask(ArchiveMessages.getRequestJobCounts(), timeout);
-
- result = Await.result(response, timeout);
-
- if(!(result instanceof Tuple3)) {
- throw new RuntimeException("RequestJobCounts requires a response of type " +
- "Tuple3. Instead the response is of type " + result.getClass() +
- ".");
- } else {
- writeJsonForJobCounts(resp.getWriter(), (Tuple3)result);
- }
- }
- else if("job".equals(req.getParameter("get"))) {
- String jobId = req.getParameter("job");
-
- response = archive.ask(new RequestJob(JobID.fromHexString(jobId)), timeout);
-
- result = Await.result(response, timeout);
-
- if(!(result instanceof JobResponse)){
- throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
- "Instead the response is of type " + result.getClass());
- }else {
- final JobResponse jobResponse = (JobResponse) result;
-
- if(jobResponse instanceof JobFound){
- ExecutionGraph archivedJob = ((JobFound)result).executionGraph();
- writeJsonForArchivedJob(resp.getWriter(), archivedJob);
- } else {
- LOG.warn("DoGet:job: Could not find job for job ID " + jobId);
- }
- }
- }
- else if("groupvertex".equals(req.getParameter("get"))) {
- String jobId = req.getParameter("job");
- String groupVertexId = req.getParameter("groupvertex");
-
- // No group vertex specified
- if (groupVertexId.equals("null")) {
- return;
- }
-
- response = archive.ask(new RequestJob(JobID.fromHexString(jobId)), timeout);
-
- result = Await.result(response, timeout);
-
- if(!(result instanceof JobResponse)){
- throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
- "Instead the response is of type " + result.getClass());
- }else {
- final JobResponse jobResponse = (JobResponse) result;
-
- if(jobResponse instanceof JobFound && groupVertexId != null){
- ExecutionGraph archivedJob = ((JobFound)jobResponse).executionGraph();
-
- writeJsonForArchivedJobGroupvertex(resp.getWriter(), archivedJob,
- JobVertexID.fromHexString(groupVertexId));
- } else {
- LOG.warn("DoGet:groupvertex: Could not find job for job ID " + jobId);
- }
- }
- }
- else if("taskmanagers".equals(req.getParameter("get"))) {
-
- response = jobmanager.ask(
- JobManagerMessages.getRequestNumberRegisteredTaskManager(),
- timeout);
-
- result = Await.result(response, timeout);
-
- if(!(result instanceof Integer)) {
- throw new RuntimeException("RequestNumberRegisteredTaskManager requires a " +
- "response of type Integer. Instead the response is of type " +
- result.getClass() + ".");
- } else {
- final int numberOfTaskManagers = (Integer)result;
-
- final Future<Object> responseRegisteredSlots = jobmanager.ask(
- JobManagerMessages.getRequestTotalNumberOfSlots(),
- timeout);
-
- final Object resultRegisteredSlots = Await.result(responseRegisteredSlots,
- timeout);
-
- if(!(resultRegisteredSlots instanceof Integer)) {
- throw new RuntimeException("RequestTotalNumberOfSlots requires a response of " +
- "type Integer. Instaed the response of type " +
- resultRegisteredSlots.getClass() + ".");
- } else {
- final int numberOfRegisteredSlots = (Integer) resultRegisteredSlots;
-
- resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagers +", " +
- "\"slots\": "+numberOfRegisteredSlots+"}");
- }
- }
- }
- else if("cancel".equals(req.getParameter("get"))) {
- String jobId = req.getParameter("job");
-
- response = jobmanager.ask(
- new CancelJob(JobID.fromHexString(jobId)),
- timeout);
-
- Await.ready(response, timeout);
- }
- else if("updates".equals(req.getParameter("get"))) {
- String jobId = req.getParameter("job");
- writeJsonUpdatesForJob(resp.getWriter(), JobID.fromHexString(jobId));
- } else if ("version".equals(req.getParameter("get"))) {
- writeJsonForVersion(resp.getWriter());
- }
- else{
- response = jobmanager.ask(
- JobManagerMessages.getRequestRunningJobs(),
- timeout);
-
- result = Await.result(response, timeout);
-
- if(!(result instanceof RunningJobs)){
- throw new RuntimeException("RequestRunningJobs requires a response of type " +
- "RunningJobs. Instead the response of type " + result.getClass() + ".");
- } else {
- final Iterable<ExecutionGraph> runningJobs =
- ((RunningJobs) result).asJavaIterable();
-
- writeJsonForJobs(resp.getWriter(), runningJobs);
- }
- }
-
- } catch (Exception e) {
- resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
- resp.getWriter().print(e.getMessage());
- if (LOG.isWarnEnabled()) {
- LOG.warn(StringUtils.stringifyException(e));
- }
- }
- }
-
- /**
- * Writes ManagementGraph as Json for all recent jobs
- *
- * @param wrt
- * @param graphs
- */
- private void writeJsonForJobs(PrintWriter wrt, Iterable<ExecutionGraph> graphs) {
- try {
- wrt.write("[");
-
- Iterator<ExecutionGraph> it = graphs.iterator();
- // Loop Jobs
- while(it.hasNext()){
- ExecutionGraph graph = it.next();
-
- writeJsonForJob(wrt, graph);
-
- //Write seperator between json objects
- if(it.hasNext()) {
- wrt.write(",");
- }
- }
- wrt.write("]");
-
- } catch (EofException eof) { // Connection closed by client
- LOG.info("Info server for jobmanager: Connection closed by client, EofException");
- } catch (IOException ioe) { // Connection closed by client
- LOG.info("Info server for jobmanager: Connection closed by client, IOException");
- }
- }
-
- private void writeJsonForJob(PrintWriter wrt, ExecutionGraph graph) throws IOException {
- //Serialize job to json
- wrt.write("{");
- wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
- wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
- wrt.write("\"status\": \""+ graph.getState() + "\",");
- wrt.write("\"time\": " + graph.getStatusTimestamp(graph.getState())+",");
-
- // Serialize ManagementGraph to json
- wrt.write("\"groupvertices\": [");
- boolean first = true;
-
- for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
- //Write seperator between json objects
- if(first) {
- first = false;
- } else {
- wrt.write(","); }
-
- wrt.write(JsonFactory.toJson(groupVertex));
- }
- wrt.write("]");
- wrt.write("}");
-
- }
-
- /**
- * Writes Json with a list of currently archived jobs, sorted by time
- *
- * @param wrt
- * @param graphs
- */
- private void writeJsonForArchive(PrintWriter wrt, List<ExecutionGraph> graphs) {
-
- wrt.write("[");
-
- // sort jobs by time
- Collections.sort(graphs, new Comparator<ExecutionGraph>() {
- @Override
- public int compare(ExecutionGraph o1, ExecutionGraph o2) {
- if(o1.getStatusTimestamp(o1.getState()) < o2.getStatusTimestamp(o2.getState())) {
- return 1;
- } else {
- return -1;
- }
- }
-
- });
-
- // Loop Jobs
- for (int i = 0; i < graphs.size(); i++) {
- ExecutionGraph graph = graphs.get(i);
-
- //Serialize job to json
- wrt.write("{");
- wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
- wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
- wrt.write("\"status\": \""+ graph.getState() + "\",");
- wrt.write("\"time\": " + graph.getStatusTimestamp(graph.getState()));
-
- wrt.write("}");
-
- //Write seperator between json objects
- if(i != graphs.size() - 1) {
- wrt.write(",");
- }
- }
- wrt.write("]");
-
- }
-
- /**
- * Writes Json with the job counts
- *
- * @param wrt
- * @param jobCounts
- */
- private void writeJsonForJobCounts(PrintWriter wrt, Tuple3<Integer, Integer, Integer> jobCounts) {
-
- wrt.write("{");
- wrt.write("\"finished\": " + jobCounts._1() + ",");
- wrt.write("\"canceled\": " + jobCounts._2() + ",");
- wrt.write("\"failed\": " + jobCounts._3());
- wrt.write("}");
-
- }
-
- /**
- * Writes infos about archived job in Json format, including groupvertices and groupverticetimes
- *
- * @param wrt
- * @param graph
- */
- private void writeJsonForArchivedJob(PrintWriter wrt, ExecutionGraph graph) {
- try {
- wrt.write("[");
-
- //Serialize job to json
- wrt.write("{");
- wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
- wrt.write("\"jobname\": \"" + graph.getJobName() + "\",");
- wrt.write("\"status\": \"" + graph.getState() + "\",");
- wrt.write("\"SCHEDULED\": " + graph.getStatusTimestamp(JobStatus.CREATED) + ",");
- wrt.write("\"RUNNING\": " + graph.getStatusTimestamp(JobStatus.RUNNING) + ",");
- wrt.write("\"FINISHED\": " + graph.getStatusTimestamp(JobStatus.FINISHED) + ",");
- wrt.write("\"FAILED\": " + graph.getStatusTimestamp(JobStatus.FAILED) + ",");
- wrt.write("\"CANCELED\": " + graph.getStatusTimestamp(JobStatus.CANCELED) + ",");
-
- if (graph.getState() == JobStatus.FAILED) {
- wrt.write("\"failednodes\": [");
- boolean first = true;
- for (ExecutionVertex vertex : graph.getAllExecutionVertices()) {
- if (vertex.getExecutionState() == ExecutionState.FAILED) {
- InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
- Throwable failureCause = vertex.getFailureCause();
- if (location != null || failureCause != null) {
- if (first) {
- first = false;
- } else {
- wrt.write(",");
- }
- wrt.write("{");
- wrt.write("\"node\": \"" + (location == null ? "(none)" : location.getFQDNHostname()) + "\",");
- wrt.write("\"message\": \"" + (failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + "\"");
- wrt.write("}");
- }
- }
- }
- wrt.write("],");
- }
-
- // Serialize ManagementGraph to json
- wrt.write("\"groupvertices\": [");
- boolean first = true;
- for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
- //Write seperator between json objects
- if (first) {
- first = false;
- } else {
- wrt.write(",");
- }
-
- wrt.write(JsonFactory.toJson(groupVertex));
-
- }
- wrt.write("],");
-
- // write user config
- ExecutionConfig ec = graph.getExecutionConfig();
- if(ec != null) {
- wrt.write("\"executionConfig\": {");
- wrt.write("\"Execution Mode\": \""+ec.getExecutionMode()+"\",");
- wrt.write("\"Max. number of execution retries\": \""+ec.getNumberOfExecutionRetries()+"\",");
- wrt.write("\"Job parallelism\": \""+ec.getParallelism()+"\",");
- wrt.write("\"Object reuse mode\": \""+ec.isObjectReuseEnabled()+"\"");
- ExecutionConfig.GlobalJobParameters uc = ec.getGlobalJobParameters();
- if(uc != null) {
- Map<String, String> ucVals = uc.toMap();
- if (ucVals != null) {
- String ucString = "{";
- int i = 0;
- for (Map.Entry<String, String> ucVal : ucVals.entrySet()) {
- ucString += "\"" + ucVal.getKey() + "\":\"" + ucVal.getValue() + "\"";
- if (++i < ucVals.size()) {
- ucString += ",\n";
- }
- }
- wrt.write(", \"userConfig\": " + ucString + "}");
- }
- else {
- LOG.debug("GlobalJobParameters.toMap() did not return anything");
- }
- }
- else {
- LOG.debug("No GlobalJobParameters were set in the execution config");
- }
- wrt.write("},");
- } else {
- LOG.warn("Unable to retrieve execution config from execution graph");
- }
-
- // write accumulators
- final Future<Object> response = jobmanager.ask(
- new RequestAccumulatorResultsStringified(graph.getJobID()), timeout);
-
- Object result;
- try {
- result = Await.result(response, timeout);
- } catch (Exception ex) {
- throw new IOException("Could not retrieve the accumulator results from the job manager.", ex);
- }
-
- if (result instanceof AccumulatorResultStringsFound) {
- StringifiedAccumulatorResult[] accumulators = ((AccumulatorResultStringsFound) result).result();
-
- wrt.write("\n\"accumulators\": [");
- int i = 0;
- for (StringifiedAccumulatorResult accumulator : accumulators) {
- wrt.write("{ \"name\": \"" + accumulator.getName() + " (" + accumulator.getType() + ")\","
- + " \"value\": \"" + accumulator.getValue() + "\"}\n");
- if (++i < accumulators.length) {
- wrt.write(",");
- }
- }
- wrt.write("],\n");
- }
- else if (result instanceof AccumulatorResultsNotFound) {
- wrt.write("\n\"accumulators\": [],");
- }
- else if (result instanceof AccumulatorResultsErroneous) {
- LOG.error("Could not obtain accumulators for job " + graph.getJobID(),
- ((AccumulatorResultsErroneous) result).cause());
- }
- else {
- throw new RuntimeException("RequestAccumulatorResults requires a response of type " +
- "AccumulatorResultStringsFound. Instead the response is of type " +
- result.getClass() + ".");
- }
-
- wrt.write("\"groupverticetimes\": {");
- first = true;
-
- for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
- if (first) {
- first = false;
- } else {
- wrt.write(",");
- }
-
- // Calculate start and end time for groupvertex
- long started = Long.MAX_VALUE;
- long ended = 0;
-
- // Take earliest running state and latest endstate of groupmembers
- for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
-
- long running = vertex.getStateTimestamp(ExecutionState.RUNNING);
- if (running != 0 && running < started) {
- started = running;
- }
-
- long finished = vertex.getStateTimestamp(ExecutionState.FINISHED);
- long canceled = vertex.getStateTimestamp(ExecutionState.CANCELED);
- long failed = vertex.getStateTimestamp(ExecutionState.FAILED);
-
- if (finished != 0 && finished > ended) {
- ended = finished;
- }
-
- if (canceled != 0 && canceled > ended) {
- ended = canceled;
- }
-
- if (failed != 0 && failed > ended) {
- ended = failed;
- }
-
- }
-
- wrt.write("\"" + groupVertex.getJobVertexId() + "\": {");
- wrt.write("\"groupvertexid\": \"" + groupVertex.getJobVertexId() + "\",");
- wrt.write("\"groupvertexname\": \"" + groupVertex + "\",");
- wrt.write("\"STARTED\": " + started + ",");
- wrt.write("\"ENDED\": " + ended);
- wrt.write("}");
-
- }
-
- wrt.write("}");
- wrt.write("}");
- wrt.write("]");
- }
- catch (Exception ex) { // Connection closed by client
- LOG.error("Info server for JobManager: Failed to write json for archived jobs", ex);
- }
- }
-
- /**
- * Writes all updates (events) for a given job since a given time
- *
- * @param wrt
- * @param jobId
- */
- private void writeJsonUpdatesForJob(PrintWriter wrt, JobID jobId) {
-
- try {
- final Future<Object> responseArchivedJobs = jobmanager.ask(
- JobManagerMessages.getRequestRunningJobs(),
- timeout);
-
- Object resultArchivedJobs = null;
-
- try{
- resultArchivedJobs = Await.result(responseArchivedJobs, timeout);
- } catch (Exception ex) {
- throw new IOException("Could not retrieve archived jobs from the job manager.", ex);
- }
-
- if(!(resultArchivedJobs instanceof RunningJobs)){
- throw new RuntimeException("RequestArchivedJobs requires a response of type " +
- "RunningJobs. Instead the response is of type " +
- resultArchivedJobs.getClass() + ".");
- } else {
- final Iterable<ExecutionGraph> graphs = ((RunningJobs)resultArchivedJobs).
- asJavaIterable();
-
- //Serialize job to json
- wrt.write("{");
- wrt.write("\"jobid\": \"" + jobId + "\",");
- wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\",");
- wrt.write("\"recentjobs\": [");
-
- boolean first = true;
-
- for(ExecutionGraph g : graphs){
- if (first) {
- first = false;
- } else {
- wrt.write(",");
- }
-
- wrt.write("\"" + g.getJobID() + "\"");
- }
-
- wrt.write("],");
-
- final Future<Object> responseJob = jobmanager.ask(new RequestJob(jobId), timeout);
-
- Object resultJob = null;
-
- try{
- resultJob = Await.result(responseJob, timeout);
- } catch (Exception ex){
- throw new IOException("Could not retrieve the job with jobID " + jobId +
- "from the job manager.", ex);
- }
-
- if(!(resultJob instanceof JobResponse)) {
- throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
- "Instead the response is of type " + resultJob.getClass() + ".");
- } else {
- final JobResponse response = (JobResponse) resultJob;
-
- if(response instanceof JobFound){
- ExecutionGraph graph = ((JobFound)response).executionGraph();
-
- wrt.write("\"vertexevents\": [");
-
- first = true;
- for (ExecutionVertex ev : graph.getAllExecutionVertices()) {
- if (first) {
- first = false;
- } else {
- wrt.write(",");
- }
-
- wrt.write("{");
- wrt.write("\"vertexid\": \"" + ev.getCurrentExecutionAttempt().getAttemptId()
- + "\",");
- wrt.write("\"newstate\": \"" + ev.getExecutionState() + "\",");
- wrt.write("\"timestamp\": \"" + ev.getStateTimestamp(ev.getExecutionState())
- + "\"");
- wrt.write("}");
- }
-
- wrt.write("],");
-
- wrt.write("\"jobevents\": [");
-
- wrt.write("{");
- wrt.write("\"newstate\": \"" + graph.getState() + "\",");
- wrt.write("\"timestamp\": \"" + graph.getStatusTimestamp(graph.getState()) + "\"");
- wrt.write("}");
-
- wrt.write("]");
-
- wrt.write("}");
- } else {
- wrt.write("\"vertexevents\": [],");
- wrt.write("\"jobevents\": [");
- wrt.write("{");
- wrt.write("\"newstate\": \"" + JobStatus.FINISHED + "\",");
- wrt.write("\"timestamp\": \"" + System.currentTimeMillis() + "\"");
- wrt.write("}");
- wrt.write("]");
- wrt.write("}");
- LOG.warn("WriteJsonUpdatesForJob: Could not find job with job ID " + jobId);
- }
- }
- }
-
- } catch (Exception exception) { // Connection closed by client
- LOG.info("Info server for jobmanager: Failed to write json updates for job {}, " +
- "because {}.", jobId, StringUtils.stringifyException(exception));
- }
-
- }
-
- /**
- * Writes info about one particular archived JobVertex in a job, including all member execution vertices, their times and statuses.
- */
- private void writeJsonForArchivedJobGroupvertex(PrintWriter wrt, ExecutionGraph graph,
- JobVertexID vertexId) {
- ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId);
-
- // Serialize ManagementGraph to json
- wrt.write("{\"groupvertex\": " + JsonFactory.toJson(jobVertex) + ",");
-
- wrt.write("\"verticetimes\": {");
- boolean first = true;
- for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) {
-
- for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
-
- Execution exec = vertex.getCurrentExecutionAttempt();
-
- if(first) {
- first = false;
- } else {
- wrt.write(","); }
-
- wrt.write("\""+exec.getAttemptId() +"\": {");
- wrt.write("\"vertexid\": \"" + exec.getAttemptId() + "\",");
- wrt.write("\"vertexname\": \"" + vertex + "\",");
- wrt.write("\"CREATED\": "+ vertex.getStateTimestamp(ExecutionState.CREATED) + ",");
- wrt.write("\"SCHEDULED\": "+ vertex.getStateTimestamp(ExecutionState.SCHEDULED) + ",");
- wrt.write("\"DEPLOYING\": "+ vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ",");
- wrt.write("\"RUNNING\": "+ vertex.getStateTimestamp(ExecutionState.RUNNING) + ",");
- wrt.write("\"FINISHED\": "+ vertex.getStateTimestamp(ExecutionState.FINISHED) + ",");
- wrt.write("\"CANCELING\": "+ vertex.getStateTimestamp(ExecutionState.CANCELING) + ",");
- wrt.write("\"CANCELED\": "+ vertex.getStateTimestamp(ExecutionState.CANCELED) + ",");
- wrt.write("\"FAILED\": "+ vertex.getStateTimestamp(ExecutionState.FAILED) + "");
- wrt.write("}");
- }
-
- }
- wrt.write("}}");
- }
-
- /**
- * Writes the version and the revision of Flink.
- *
- * @param wrt
- */
- private void writeJsonForVersion(PrintWriter wrt) {
- wrt.write("{");
- wrt.write("\"version\": \"" + EnvironmentInformation.getVersion() + "\",");
- wrt.write("\"revision\": \"" + EnvironmentInformation.getRevisionInformation().commitId + "\"");
- wrt.write("}");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
deleted file mode 100644
index 89e55d0..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.web;
-
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.IntermediateResult;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.util.StringUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class JsonFactory {
-
- public static String toJson(ExecutionVertex vertex) {
- StringBuilder json = new StringBuilder("");
- json.append("{");
- json.append("\"vertexid\": \"" + vertex.getCurrentExecutionAttempt().getAttemptId() + "\",");
- json.append("\"vertexname\": \"" + StringUtils.escapeHtml(vertex.getSimpleName()) + "\",");
- json.append("\"vertexstatus\": \"" + vertex.getExecutionState() + "\",");
-
- InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
- String instanceName = location == null ? "(null)" : location.getFQDNHostname();
-
- json.append("\"vertexinstancename\": \"" + instanceName + "\"");
- json.append("}");
- return json.toString();
- }
-
- public static String toJson(ExecutionJobVertex jobVertex) {
- StringBuilder json = new StringBuilder("");
-
- json.append("{");
- json.append("\"groupvertexid\": \"" + jobVertex.getJobVertexId() + "\",");
- json.append("\"groupvertexname\": \"" + StringUtils.escapeHtml(jobVertex.getJobVertex().getName()) + "\",");
- json.append("\"numberofgroupmembers\": " + jobVertex.getParallelism() + ",");
- json.append("\"groupmembers\": [");
-
- // Count state status of group members
- Map<ExecutionState, Integer> stateCounts = new HashMap<ExecutionState, Integer>();
-
- // initialize with 0
- for (ExecutionState state : ExecutionState.values()) {
- stateCounts.put(state, Integer.valueOf(0));
- }
-
- ExecutionVertex[] vertices = jobVertex.getTaskVertices();
-
- for (int j = 0; j < vertices.length; j++) {
- ExecutionVertex vertex = vertices[j];
-
- json.append(toJson(vertex));
-
- // print delimiter
- if (j != vertices.length - 1) {
- json.append(",");
- }
-
- // Increment state status count
- int count = stateCounts.get(vertex.getExecutionState()) + 1;
- stateCounts.put(vertex.getExecutionState(), count);
- }
-
- json.append("],");
- json.append("\"backwardEdges\": [");
-
- List<IntermediateResult> inputs = jobVertex.getInputs();
-
- for (int inputNumber = 0; inputNumber < inputs.size(); inputNumber++) {
- ExecutionJobVertex input = inputs.get(inputNumber).getProducer();
-
- json.append("{");
- json.append("\"groupvertexid\": \"" + input.getJobVertexId() + "\",");
- json.append("\"groupvertexname\": \"" + StringUtils.escapeHtml(jobVertex.getJobVertex().getName()) + "\"");
- json.append("}");
-
- // print delimiter
- if(inputNumber != inputs.size() - 1) {
- json.append(",");
- }
- }
- json.append("]");
-
- // list number of members for each status
- for (Map.Entry<ExecutionState, Integer> stateCount : stateCounts.entrySet()) {
- json.append(",\""+stateCount.getKey()+"\": " + stateCount.getValue());
- }
-
- json.append("}");
-
- return json.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
deleted file mode 100644
index ecffdfd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.web;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.util.StringUtils;
-
-public class LogfileInfoServlet extends HttpServlet {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * The log for this class.
- */
- private static final Logger LOG = LoggerFactory.getLogger(LogfileInfoServlet.class);
-
- private File[] logDirs;
-
-
- public LogfileInfoServlet(File[] logDirs) {
- if(logDirs == null){
- throw new NullPointerException("The given log files are null.");
- }
- this.logDirs = logDirs;
- }
-
- @Override
- protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
-
- try {
- if("stdout".equals(req.getParameter("get"))) {
- // Find current stdout file
- sendFile(".*jobmanager-[^\\.]*\\.out", resp);
- }
- else {
- // Find current logfile
- sendFile(".*jobmanager-[^\\.]*\\.log", resp);
- }
- } catch (Throwable t) {
- resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
- resp.getWriter().print("Error opening log files':"+t.getMessage());
- if (LOG.isWarnEnabled()) {
- LOG.warn(StringUtils.stringifyException(t));
- }
- }
- }
-
- private void sendFile(String fileNamePattern, HttpServletResponse resp) throws IOException {
- for(File logDir: logDirs) {
- if(logDir == null) {
- continue;
- }
- File[] files = logDir.listFiles();
- if(files == null) {
- resp.setStatus(HttpServletResponse.SC_OK);
- resp.setContentType("text/plain");
- resp.getOutputStream().write(("The specified log directory '"+logDir+"' is empty").getBytes());
- } else {
- for (File f : files) {
- // contains "jobmanager" ".log" and no number in the end ->needs improvement
- if (f.getName().matches(fileNamePattern)) {
- resp.setStatus(HttpServletResponse.SC_OK);
- resp.setContentType("text/plain");
- writeFile(resp.getOutputStream(), f);
- }
- }
- }
- }
- }
- private static void writeFile(OutputStream out, File file) throws IOException {
- byte[] buf = new byte[4 * 1024]; // 4K buffer
-
- FileInputStream is = null;
- try {
- is = new FileInputStream(file);
- out.write(("==== FILE: "+file.toString()+" ====\n").getBytes());
- int bytesRead;
- while ((bytesRead = is.read(buf)) != -1) {
- out.write(buf, 0, bytesRead);
- }
- } finally {
- if (is != null) {
- is.close();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java
deleted file mode 100644
index 6de1434..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.web;
-
-import java.io.IOException;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Servlet that displays the Configruation in the webinterface.
- *
- */
-public class MenuServlet extends HttpServlet {
-
- /**
- * Serial UID for serialization interoperability.
- */
- private static final long serialVersionUID = 117543213991787547L;
-
- /**
- * The log for this class.
- */
- private static final Logger LOG = LoggerFactory.getLogger(MenuServlet.class);
-
- /**
- * Array of possible menu entries on the left
- */
- private static final String[] entries = {
- "index", "history", "configuration", "taskmanagers"
- };
-
- /**
- * The names of the menu entries shown in the browser
- */
- private static final String[] names = {
- "Dashboard", "History", "Configuration", "Task Managers"
- };
-
- /**
- * The classes of the icons shown next to the names in the browser
- */
- private static final String[] classes = {
- "fa fa-dashboard", "fa fa-bar-chart-o", "fa fa-keyboard-o", "fa fa-building-o"
- };
-
- public MenuServlet() {
- if (names.length != entries.length || names.length != classes.length) {
- LOG.error("The Arrays 'entries', 'classes' and 'names' differ in thier length. This is not allowed!");
- }
- }
-
- @Override
- protected void doGet(HttpServletRequest req, HttpServletResponse resp)
- throws ServletException, IOException {
-
- resp.setStatus(HttpServletResponse.SC_OK);
- resp.setContentType("application/json");
-
- if ("index".equals(req.getParameter("get"))) {
- writeMenu("index", resp);
- } else if ("analyze".equals(req.getParameter("get"))) {
- writeMenu("analyze", resp);
- } else if ("history".equals(req.getParameter("get"))) {
- writeMenu("history", resp);
- } else if ("configuration".equals(req.getParameter("get"))) {
- writeMenu("configuration", resp);
- } else if ("taskmanagers".equals(req.getParameter("get"))) {
- writeMenu("taskmanagers", resp);
- }
-
- }
-
- private void writeMenu(String me, HttpServletResponse resp) throws IOException {
-
- String r = "";
-
- for (int i = 0; i < entries.length; i++) {
- if (entries[i].equals(me)) {
- r += writeLine(3, "<li class='active'><a href='"+ entries[i] +".html'><i class='"+ classes[i] +"'></i> "+ names[i] +"</a></li>");
- } else {
- r += writeLine(3, "<li><a href='"+ entries[i] +".html'><i class='"+ classes[i] +"'></i> "+ names[i] +"</a></li>");
- }
- }
-
- resp.getWriter().write(r);
- }
-
- private String writeLine(int tab, String line) {
- String s = "";
- for (int i = 0; i < tab; i++) {
- s += "\t";
- }
- s+= " " + line + " \n";
- return s;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
deleted file mode 100644
index 1f2bfe0..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.web;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.List;
-import java.util.Set;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.Instance;
-
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.RegisteredTaskManagers;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestStackTrace;
-import org.apache.flink.runtime.messages.TaskManagerMessages.StackTrace;
-import org.apache.flink.util.StringUtils;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * A Servlet that displays the Configuration in the web interface.
- */
-public class SetupInfoServlet extends HttpServlet {
-
- /** Serial UID for serialization interoperability. */
- private static final long serialVersionUID = 3704963598772630435L;
-
- /** The log for this class. */
- private static final Logger LOG = LoggerFactory.getLogger(SetupInfoServlet.class);
-
-
- final private Configuration configuration;
- final private ActorGateway jobmanager;
- final private FiniteDuration timeout;
-
-
- public SetupInfoServlet(Configuration conf, ActorGateway jobManager, FiniteDuration timeout) {
- configuration = conf;
- this.jobmanager = jobManager;
- this.timeout = timeout;
- }
-
- @Override
- protected void doGet(HttpServletRequest req, HttpServletResponse resp)
- throws ServletException, IOException {
- resp.setStatus(HttpServletResponse.SC_OK);
- resp.setContentType("application/json");
-
- if ("globalC".equals(req.getParameter("get"))) {
- writeGlobalConfiguration(resp);
- } else if ("taskmanagers".equals(req.getParameter("get"))) {
- writeTaskmanagers(resp);
- } else if ("stackTrace".equals(req.getParameter("get"))) {
- String instanceId = req.getParameter("instanceID");
- writeStackTraceOfTaskManager(instanceId, resp);
- }
- }
-
- private void writeGlobalConfiguration(HttpServletResponse resp) throws IOException {
- Set<String> keys = configuration.keySet();
- List<String> list = new ArrayList<String>(keys);
- Collections.sort(list);
-
- JSONObject obj = new JSONObject();
- for (String k : list) {
- try {
-
- obj.put(k, configuration.getString(k, ""));
- } catch (JSONException e) {
- LOG.warn("Json object creation failed", e);
- }
- }
-
- PrintWriter w = resp.getWriter();
- w.write(obj.toString());
- }
-
- private void writeTaskmanagers(HttpServletResponse resp) throws IOException {
-
- final Future<Object> response = jobmanager.ask(
- JobManagerMessages.getRequestRegisteredTaskManagers(),
- timeout);
-
- Object obj = null;
-
- try{
- obj = Await.result(response, timeout);
- } catch (Exception ex) {
- throw new IOException("Could not retrieve all registered task managers from the " +
- "job manager.", ex);
- }
-
- if(!(obj instanceof RegisteredTaskManagers)){
- throw new RuntimeException("RequestRegisteredTaskManagers should return a response of " +
- "type RegisteredTaskManagers. Instead the respone is of type " +
- obj.getClass() + ".");
- } else {
-
- final List<Instance> instances = new ArrayList<Instance>(
- ((RegisteredTaskManagers) obj).asJavaCollection());
-
- Collections.sort(instances, INSTANCE_SORTER);
-
- JSONObject jsonObj = new JSONObject();
- JSONArray array = new JSONArray();
- for (Instance instance : instances) {
- JSONObject objInner = new JSONObject();
-
- long time = new Date().getTime() - instance.getLastHeartBeat();
-
- try {
- objInner.put("path", instance.getActorGateway().path());
- objInner.put("dataPort", instance.getInstanceConnectionInfo().dataPort());
- objInner.put("timeSinceLastHeartbeat", time / 1000);
- objInner.put("slotsNumber", instance.getTotalNumberOfSlots());
- objInner.put("freeSlots", instance.getNumberOfAvailableSlots());
- objInner.put("cpuCores", instance.getResources().getNumberOfCPUCores());
- objInner.put("physicalMemory", instance.getResources().getSizeOfPhysicalMemory() >>> 20);
- objInner.put("freeMemory", instance.getResources().getSizeOfJvmHeap() >>> 20);
- objInner.put("managedMemory", instance.getResources().getSizeOfManagedMemory() >>> 20);
- objInner.put("instanceID", instance.getId());
- byte[] report = instance.getLastMetricsReport();
- if(report != null) {
- objInner.put("metrics", new JSONObject(new String(report, "utf-8")));
- }
- array.put(objInner);
- } catch (JSONException e) {
- LOG.warn("Json object creation failed", e);
- }
-
- }
- try {
- jsonObj.put("taskmanagers", array);
- } catch (JSONException e) {
- LOG.warn("Json object creation failed", e);
- }
-
- PrintWriter w = resp.getWriter();
- w.write(jsonObj.toString());
- }
- }
-
-
- private void writeStackTraceOfTaskManager(String instanceIdStr, HttpServletResponse resp) throws IOException {
- InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(instanceIdStr));
- StackTrace message = null;
- Throwable exception = null;
-
- final Future<Object> response = jobmanager.ask(
- new RequestStackTrace(instanceID),
- timeout);
-
- try {
- message = (StackTrace) Await.result(response, timeout);
- } catch (Exception ex) {
- exception = ex;
- }
-
- JSONObject obj = new JSONObject();
- try {
- if (message != null) {
- obj.put("stackTrace", message.stackTrace());
- } else if (exception != null) {
- obj.put("errorMessage", exception.getMessage());
- }
- } catch (JSONException e) {
- LOG.warn("Json object creation failed", e);
- }
-
- PrintWriter writer = resp.getWriter();
- writer.write(obj.toString());
- }
- // --------------------------------------------------------------------------------------------
-
- private static final Comparator<Instance> INSTANCE_SORTER = new Comparator<Instance>() {
- @Override
- public int compare(Instance o1, Instance o2) {
- return o1.getInstanceConnectionInfo().compareTo(o2.getInstanceConnectionInfo());
- }
- };
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
deleted file mode 100644
index 21a1f51..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.web;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URL;
-import java.util.UUID;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import com.google.common.base.Preconditions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.webmonitor.WebMonitor;
-import org.eclipse.jetty.server.Connector;
-import org.eclipse.jetty.server.Handler;
-import org.eclipse.jetty.server.handler.HandlerCollection;
-import org.eclipse.jetty.server.handler.ResourceHandler;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * This class sets up a web-server that contains a web frontend to display information about running jobs.
- * It instantiates and configures an embedded jetty server.
- */
-public class WebInfoServer implements WebMonitor, LeaderRetrievalListener {
-
- /** Web root dir in the jar */
- private static final String WEB_ROOT_DIR = "web-docs-infoserver";
-
- /** The log for this class. */
- private static final Logger LOG = LoggerFactory.getLogger(WebInfoServer.class);
-
- /** The jetty server serving all requests. */
- private final Server server;
-
- /** Retrieval service for the current leading JobManager */
- private final LeaderRetrievalService leaderRetrievalService;
-
- /** ActorSystem used to retrieve the ActorRefs */
- private final ActorSystem actorSystem;
-
- /** Collection for the registered jetty handlers */
- private final HandlerCollection handlers;
-
- /** Associated configuration */
- private final Configuration config;
-
- /** Timeout for the servlets */
- private final FiniteDuration timeout;
-
- /** Actor look up timeout */
- private final FiniteDuration lookupTimeout;
-
- /** Default jetty handler responsible for serving static content */
- private final ResourceHandler resourceHandler;
-
- /** File paths to log dirs */
- final File[] logDirFiles;
-
- /** The assigned port where jetty is running. */
- private int assignedPort = -1;
-
- /**
- * Creates a new web info server. The server runs the servlets that implement the logic
- * to list all present information concerning the job manager
- *
- * @param config The Flink configuration.
- * @param leaderRetrievalService Retrieval service to obtain the current leader
- *
- * @throws IOException
- * Thrown, if the server setup failed for an I/O related reason.
- */
- public WebInfoServer(
- Configuration config,
- LeaderRetrievalService leaderRetrievalService,
- ActorSystem actorSystem)
- throws IOException {
- if (config == null) {
- throw new IllegalArgumentException("No Configuration has been passed to the web server");
- }
-
- this.config = config;
-
- this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService);
-
- // if port == 0, jetty will assign an available port.
- int port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
- if (port < 0) {
- throw new IllegalArgumentException("Invalid port for the webserver: " + port);
- }
-
- timeout = AkkaUtils.getTimeout(config);
- lookupTimeout = AkkaUtils.getLookupTimeout(config);
-
- this.actorSystem = actorSystem;
-
- // get base path of Flink installation
- final String basePath = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, "");
- final String[] logDirPaths = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY,
- basePath+"/log").split(","); // YARN allows to specify multiple log directories
-
- URL webRootDir = this.getClass().getClassLoader().getResource(WEB_ROOT_DIR);
-
- if(webRootDir == null) {
- throw new FileNotFoundException("Cannot start JobManager web info server. The " +
- "resource " + WEB_ROOT_DIR + " is not included in the jar.");
- }
-
- logDirFiles = new File[logDirPaths.length];
- int i = 0;
- for(String path : logDirPaths) {
- logDirFiles[i++] = new File(path);
- }
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Setting up web info server, using web-root directory " +
- webRootDir.toExternalForm() + ".");
-
- }
-
- server = new Server(port);
-
- // ----- the handler serving all the static files -----
- resourceHandler = new ResourceHandler();
- resourceHandler.setDirectoriesListed(false);
- resourceHandler.setResourceBase(webRootDir.toExternalForm());
-
- // ----- add the handlers to the list handler -----
-
- // make the HandlerCollection mutable so that we can update it later on
- handlers = new HandlerCollection(true);
- handlers.addHandler(resourceHandler);
- server.setHandler(handlers);
- }
-
- /**
- * Starts the web frontend server.
- *
- * @throws Exception
- * Thrown, if the start fails.
- */
- public void start() throws Exception {
- server.start();
-
- final Connector[] connectors = server.getConnectors();
- if (connectors != null && connectors.length > 0) {
- Connector conn = connectors[0];
-
- // we have to use getLocalPort() instead of getPort() http://stackoverflow.com/questions/8884865/how-to-discover-jetty-7-running-port
- this.assignedPort = conn.getLocalPort();
- String host = conn.getHost();
- if (host == null) { // as per method documentation
- host = "0.0.0.0";
- }
- LOG.info("Started web info server for JobManager on {}:{}", host, assignedPort);
- }
- else {
- LOG.warn("Unable to determine local endpoint of web frontend server");
- }
-
- leaderRetrievalService.start(this);
- }
-
- /**
- * Stop the webserver
- */
- public void stop() throws Exception {
- leaderRetrievalService.stop();
- server.stop();
- assignedPort = -1;
- }
-
- public int getServerPort() {
- return this.assignedPort;
- }
-
- @Override
- public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
-
- if(leaderAddress != null && !leaderAddress.equals("")) {
- try {
- ActorRef jobManager = AkkaUtils.getActorRef(
- leaderAddress,
- actorSystem,
- lookupTimeout);
- ActorGateway jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID);
-
- Future<Object> archiveFuture = jobManagerGateway.ask(
- JobManagerMessages.getRequestArchive(),
- timeout);
-
- ActorRef archive = ((JobManagerMessages.ResponseArchive) Await.result(
- archiveFuture,
- timeout)).actor();
-
- ActorGateway archiveGateway = new AkkaActorGateway(archive, leaderSessionID);
-
- updateHandler(jobManagerGateway, archiveGateway);
- } catch (Exception e) {
- handleError(e);
- }
- }
- }
-
- @Override
- public void handleError(Exception exception) {
- LOG.error("Received error from LeaderRetrievalService.", exception);
-
- try{
- // stop the whole web server
- stop();
- } catch (Exception e) {
- LOG.error("Error while stopping the web server due to a LeaderRetrievalService error.", e);
- }
- }
-
- /**
- * Updates the Flink handlers with the current leading JobManager and archive
- *
- * @param jobManager ActorGateway to the current JobManager leader
- * @param archive ActorGateway to the current archive of the leading JobManager
- * @throws Exception
- */
- private void updateHandler(ActorGateway jobManager, ActorGateway archive) throws Exception {
- // ----- the handlers for the servlets -----
- ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
- servletContext.setContextPath("/");
- servletContext.addServlet(
- new ServletHolder(
- new JobManagerInfoServlet(
- jobManager,
- archive,
- timeout)),
- "/jobsInfo");
- servletContext.addServlet(
- new ServletHolder(
- new LogfileInfoServlet(
- logDirFiles)),
- "/logInfo");
- servletContext.addServlet(
- new ServletHolder(
- new SetupInfoServlet(
- config,
- jobManager,
- timeout)),
- "/setupInfo");
- servletContext.addServlet(
- new ServletHolder(
- new MenuServlet()),
- "/menu");
-
- // replace old handlers with new ones
- handlers.setHandlers(new Handler[]{resourceHandler, servletContext});
-
- // start new handler
- servletContext.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/df448625/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 31d9aae..4fca270 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -18,12 +18,25 @@
package org.apache.flink.runtime.webmonitor;
+import akka.actor.ActorSystem;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
/**
* Utilities for the web runtime monitor. This class contains for example methods to build
@@ -32,55 +45,85 @@ import org.apache.flink.runtime.messages.webmonitor.JobDetails;
*/
public final class WebMonitorUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(WebMonitorUtils.class);
+
+ /**
+ * Starts the web runtime monitor. Because the actual implementation of the runtime monitor is
+ * in another project, we load the runtime monitor dynamically.
+ * <p/>
+ * Because failure to start the web runtime monitor is not considered fatal, this method does
+ * not throw any exceptions, but only logs them.
+ *
+ * @param config The configuration for the runtime monitor.
+ * @param leaderRetrievalService Leader retrieval service to get the leading JobManager
+ */
+ public static WebMonitor startWebRuntimeMonitor(
+ Configuration config,
+ LeaderRetrievalService leaderRetrievalService,
+ ActorSystem actorSystem) {
+ // try to load and instantiate the class
+ try {
+ String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
+ Class clazz = Class.forName(classname).asSubclass(WebMonitor.class);
+ @SuppressWarnings("unchecked")
+ Constructor<WebMonitor> constructor = clazz.getConstructor(Configuration.class,
+ LeaderRetrievalService.class,
+ ActorSystem.class);
+ return constructor.newInstance(config, leaderRetrievalService, actorSystem);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Could not load web runtime monitor. " +
+ "Probably reason: flink-runtime-web is not in the classpath");
+ LOG.debug("Caught exception", e);
+ return null;
+ } catch (InvocationTargetException e) {
+ LOG.error("WebServer could not be created", e.getTargetException());
+ return null;
+ } catch (Throwable t) {
+ LOG.error("Failed to instantiate web runtime monitor.", t);
+ return null;
+ }
+ }
+
+ public static Map<String, String> fromKeyValueJsonArray (JSONArray parsed) throws JSONException {
+ Map<String, String> hashMap = new HashMap<>();
+
+ for (int i = 0; i < parsed.length(); i++) {
+ JSONObject jsonObject = parsed.getJSONObject(i);
+ String key = jsonObject.getString("key");
+ String value = jsonObject.getString("value");
+ hashMap.put(key, value);
+ }
+
+ return hashMap;
+ }
+
public static JobDetails createDetailsForJob(ExecutionGraph job) {
JobStatus status = job.getState();
-
+
long started = job.getStatusTimestamp(JobStatus.CREATED);
long finished = status.isTerminalState() ? job.getStatusTimestamp(status) : -1L;
-
+
int[] countsPerStatus = new int[ExecutionState.values().length];
long lastChanged = 0;
int numTotalTasks = 0;
-
+
for (ExecutionJobVertex ejv : job.getVerticesTopologically()) {
ExecutionVertex[] vertices = ejv.getTaskVertices();
numTotalTasks += vertices.length;
-
+
for (ExecutionVertex vertex : vertices) {
ExecutionState state = vertex.getExecutionState();
countsPerStatus[state.ordinal()]++;
lastChanged = Math.max(lastChanged, vertex.getStateTimestamp(state));
}
}
-
+
lastChanged = Math.max(lastChanged, finished);
-
+
return new JobDetails(job.getJobID(), job.getJobName(),
- started, finished, status, lastChanged,
+ started, finished, status, lastChanged,
countsPerStatus, numTotalTasks);
}
-
- public static void aggregateExecutionStateTimestamps(long[] timestamps, long[] other) {
- timestamps[CREATED_POS] = Math.min(timestamps[CREATED_POS], other[CREATED_POS]);
- timestamps[SCHEDULED_POS] = Math.min(timestamps[SCHEDULED_POS], other[SCHEDULED_POS]);
- timestamps[DEPLOYING_POS] = Math.min(timestamps[DEPLOYING_POS], other[DEPLOYING_POS]);
- timestamps[RUNNING_POS] = Math.min(timestamps[RUNNING_POS], other[RUNNING_POS]);
- timestamps[FINISHED_POS] = Math.max(timestamps[FINISHED_POS], other[FINISHED_POS]);
- timestamps[CANCELING_POS] = Math.min(timestamps[CANCELING_POS], other[CANCELING_POS]);
- timestamps[CANCELED_POS] = Math.max(timestamps[CANCELED_POS], other[CANCELED_POS]);
- timestamps[FAILED_POS] = Math.min(timestamps[FAILED_POS], other[FAILED_POS]);
- }
-
- // ------------------------------------------------------------------------
-
- private static final int CREATED_POS = ExecutionState.CREATED.ordinal();
- private static final int SCHEDULED_POS = ExecutionState.SCHEDULED.ordinal();
- private static final int DEPLOYING_POS = ExecutionState.DEPLOYING.ordinal();
- private static final int RUNNING_POS = ExecutionState.RUNNING.ordinal();
- private static final int FINISHED_POS = ExecutionState.FINISHED.ordinal();
- private static final int CANCELING_POS = ExecutionState.CANCELING.ordinal();
- private static final int CANCELED_POS = ExecutionState.CANCELED.ordinal();
- private static final int FAILED_POS = ExecutionState.FAILED.ordinal();
/**
* Private constructor to prevent instantiation.
@@ -88,4 +131,6 @@ public final class WebMonitorUtils {
private WebMonitorUtils() {
throw new RuntimeException();
}
-}
+
+
+}
\ No newline at end of file