You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/10/19 14:33:51 UTC

[3/3] flink git commit: [FLINK-2731][web-dashboard] add access to JobManager stdout and logs

[FLINK-2731][web-dashboard] add access to JobManager stdout and logs

This closes #1233.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/03579bb5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/03579bb5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/03579bb5

Branch: refs/heads/master
Commit: 03579bb5b008b21052ad29d51f0930e9c26c6248
Parents: da071bc
Author: Sachin Goel <sa...@gmail.com>
Authored: Tue Oct 6 20:40:12 2015 +0530
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Oct 19 14:32:09 2015 +0200

----------------------------------------------------------------------
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  50 ++++++-
 .../runtime/webmonitor/files/MimeTypes.java     |   3 +
 .../files/StaticFileServerHandler.java          |  23 ++-
 .../app/partials/jobmanager/index.jade          |   2 +-
 .../app/partials/jobmanager/log.jade            |  37 +++++
 .../app/partials/jobmanager/logfile.jade        |  16 ---
 .../app/partials/jobmanager/stdout.jade         |  22 +++
 .../web-dashboard/app/scripts/index.coffee      |   8 +-
 .../modules/jobmanager/jobmanager.ctrl.coffee   |  20 +++
 .../modules/jobmanager/jobmanager.svc.coffee    |  30 ++++
 .../web-dashboard/app/styles/index.styl         |   5 +
 .../web-dashboard/web/css/index.css             |   3 +
 flink-runtime-web/web-dashboard/web/js/index.js | 140 +++++++++++++------
 .../web/partials/jobmanager/index.html          |   2 +-
 .../web/partials/jobmanager/log.html            |  39 ++++++
 .../web/partials/jobmanager/logfile.html        |  18 ---
 .../web/partials/jobmanager/stdout.html         |  24 +++-
 .../runtime/minicluster/FlinkMiniCluster.scala  |   2 +-
 .../apache/flink/test/util/TestBaseUtils.java   |   3 +
 .../flink/test/web/WebFrontendITCase.java       |  35 +++--
 .../flink/yarn/YARNSessionFIFOITCase.java       |   7 +-
 .../apache/flink/yarn/FlinkYarnClientBase.java  |   6 +-
 22 files changed, 380 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/03579bb5/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 7c252dc..649cf75 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorSystem;
-
+import com.google.common.io.PatternFilenameFilter;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
@@ -30,10 +30,11 @@ import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpServerCodec;
 import io.netty.handler.codec.http.router.Handler;
 import io.netty.handler.codec.http.router.Router;
-
 import io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
@@ -63,8 +64,10 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -87,6 +90,12 @@ public class WebRuntimeMonitor implements WebMonitor {
 	/** Logger for web frontend startup / shutdown messages */
 	private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);
 
+	/** Job manager's log file pattern */
+	public static final FilenameFilter LOG_FILE_PATTERN = new PatternFilenameFilter(".*jobmanager[^\\.]*\\.log");
+
+	/** Job manager's stdout file pattern */
+	public static final FilenameFilter STDOUT_FILE_PATTERN = new PatternFilenameFilter(".*jobmanager[^\\.]*\\.out");
+
 	// ------------------------------------------------------------------------
 
 	/** Guarding concurrent modifications to the server channel pipeline during startup and shutdown */
@@ -123,6 +132,39 @@ public class WebRuntimeMonitor implements WebMonitor {
 		String fileName = String.format("flink-web-%s", UUID.randomUUID().toString());
 		webRootDir = new File(System.getProperty("java.io.tmpdir"), fileName);
 		LOG.info("Using directory {} for the web interface files", webRootDir);
+		
+		// figure out where our logs are
+		final String flinkRoot = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
+		final String defaultLogDirectory = flinkRoot + "/log";
+		final String logDirectories = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, defaultLogDirectory);
+
+		// find out which directory holds the path for log and stdout
+		final ArrayList<String> logPaths = new ArrayList<>();
+		final ArrayList<String> outPaths = new ArrayList<>();
+
+		// yarn allows for multiple log directories. Search in all.
+		for(String paths: logDirectories.split(",")) {
+			File dir = new File(paths);
+			if (dir.exists() && dir.isDirectory() && dir.canRead()) {
+				if (dir.listFiles(LOG_FILE_PATTERN).length == 1) {
+					logPaths.add(paths);
+				}
+				if (dir.listFiles(STDOUT_FILE_PATTERN).length == 1) {
+					outPaths.add(paths);
+				}
+			}
+		}
+
+		// we don't want any ambiguities. There must be only one log and out file.
+		if(logPaths.size() != 1 || outPaths.size() != 1) {
+			throw new IllegalConfigurationException("The path to the log and out files (" +
+					logDirectories  + ") is not valid.");
+		}
+
+		final File logDir = new File(logPaths.get(0));
+		final File outDir = new File(outPaths.get(0));
+		LOG.info("Serving job manager logs from {}", logDir.getAbsolutePath());
+		LOG.info("Serving job manager stdout from {}", outDir.getAbsolutePath());
 
 		// port configuration
 		this.configuredPort = cfg.getWebFrontendPort();
@@ -144,8 +186,10 @@ public class WebRuntimeMonitor implements WebMonitor {
 			// the overview - how many task managers, slots, free slots, ...
 			.GET("/overview", handler(new ClusterOverviewHandler(retriever, DEFAULT_REQUEST_TIMEOUT)))
 
-			// job manager configuration
+			// job manager configuration, log and stdout
 			.GET("/jobmanager/config", handler(new JobManagerConfigHandler(config)))
+			.GET("/jobmanager/log", new StaticFileServerHandler(logDir))
+			.GET("/jobmanager/stdout", new StaticFileServerHandler(outDir))
 
 			// overview over jobs
 			.GET("/joboverview", handler(new CurrentJobsOverviewHandler(retriever, DEFAULT_REQUEST_TIMEOUT, true, true)))

http://git-wip-us.apache.org/repos/asf/flink/blob/03579bb5/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/MimeTypes.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/MimeTypes.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/MimeTypes.java
index 4efd029..668747c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/MimeTypes.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/MimeTypes.java
@@ -91,6 +91,9 @@ public class MimeTypes {
 		MIME_MAP.put("htm", "text/html");
 		MIME_MAP.put("css", "text/css");
 		MIME_MAP.put("txt", "text/plain");
+		MIME_MAP.put("log", "text/plain");
+		MIME_MAP.put("out", "text/plain");
+		MIME_MAP.put("err", "text/plain");
 		MIME_MAP.put("xml", "text/xml");
 		MIME_MAP.put("csv", "text/csv");
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/03579bb5/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index b1497f9..51e85b9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -44,12 +44,14 @@ import io.netty.handler.codec.http.LastHttpContent;
 import io.netty.handler.codec.http.router.Routed;
 import io.netty.util.CharsetUtil;
 
+import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.InputStream;
+import java.io.FilenameFilter;
 import java.io.RandomAccessFile;
 import java.nio.file.Files;
 import java.text.SimpleDateFormat;
@@ -130,9 +132,11 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 			requestPath = requestPath + "index.html";
 		}
 
-		// convert file separators.
-		if (File.separatorChar != '/') {
-			requestPath = requestPath.replace('/', File.separatorChar);
+		// in case the files being accessed are logs or stdout files, find appropriate paths.
+		if (requestPath.equals("/jobmanager/log")) {
+			requestPath = "/" + getFileName(rootPath, WebRuntimeMonitor.LOG_FILE_PATTERN);
+		} else if (requestPath.equals("/jobmanager/stdout")) {
+			requestPath = "/" + getFileName(rootPath, WebRuntimeMonitor.STDOUT_FILE_PATTERN);
 		}
 
 		// convert to absolute path
@@ -178,7 +182,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 				return;
 			}
 		}
-
+		
 		if (logger.isDebugEnabled()) {
 			logger.debug("Responding with file '" + file.getAbsolutePath() + '\'');
 		}
@@ -195,7 +199,11 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 
 		HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
 		setContentTypeHeader(response, file);
-		setDateAndCacheHeaders(response, file);
+
+		// since the log and out files are rapidly changing, we don't want to browser to cache them
+		if (!(requestPath.contains("log") || requestPath.contains("out"))) {
+			setDateAndCacheHeaders(response, file);
+		}
 		if (HttpHeaders.isKeepAlive(request)) {
 			response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
 		}
@@ -300,4 +308,9 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 		String mimeFinal = mimeType != null ? mimeType : MimeTypes.getDefaultMimeType();
 		response.headers().set(CONTENT_TYPE, mimeFinal);
 	}
+
+	private static String getFileName(File directory, FilenameFilter pattern) {
+		File[] files = directory.listFiles(pattern);
+		return files.length == 0 ? null : files[0].getName();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/03579bb5/flink-runtime-web/web-dashboard/app/partials/jobmanager/index.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobmanager/index.jade b/flink-runtime-web/web-dashboard/app/partials/jobmanager/index.jade
index 00ff0fd..a162b20 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobmanager/index.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobmanager/index.jade
@@ -27,7 +27,7 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main-additional
     li(ui-sref-active='active')
       a(ui-sref=".config") Configuration
     li(ui-sref-active='active')
-      a(ui-sref=".logfile") Logs
+      a(ui-sref=".log") Logs
     li(ui-sref-active='active')
       a(ui-sref=".stdout") Stdout
 

http://git-wip-us.apache.org/repos/asf/flink/blob/03579bb5/flink-runtime-web/web-dashboard/app/partials/jobmanager/log.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobmanager/log.jade b/flink-runtime-web/web-dashboard/app/partials/jobmanager/log.jade
new file mode 100644
index 0000000..33ef989
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/app/partials/jobmanager/log.jade
@@ -0,0 +1,37 @@
+//
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+      http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+table.table.table-properties
+  thead
+    tr
+      th(colspan="2")
+        .row
+          .col-xs-10
+            | Job Manager Logs
+          .col-xs-1.text-right
+            a(ng-click="reloadData()" class="show-pointer")
+              i.fa.fa-refresh
+          .col-xs-1.text-left
+            a(href="jobmanager/log")
+              i.fa.fa-download
+
+  tbody
+    tr
+      td(colspan="2")
+        pre
+          | {{jobmanager.log}}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/03579bb5/flink-runtime-web/web-dashboard/app/partials/jobmanager/logfile.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobmanager/logfile.jade b/flink-runtime-web/web-dashboard/app/partials/jobmanager/logfile.jade
deleted file mode 100644
index c0de885..0000000
--- a/flink-runtime-web/web-dashboard/app/partials/jobmanager/logfile.jade
+++ /dev/null
@@ -1,16 +0,0 @@
-//
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
- 
-      http://www.apache.org/licenses/LICENSE-2.0
- 
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.

http://git-wip-us.apache.org/repos/asf/flink/blob/03579bb5/flink-runtime-web/web-dashboard/app/partials/jobmanager/stdout.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobmanager/stdout.jade b/flink-runtime-web/web-dashboard/app/partials/jobmanager/stdout.jade
index c0de885..b84377d 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobmanager/stdout.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobmanager/stdout.jade
@@ -14,3 +14,25 @@
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
+
+
+table.table.table-properties
+  thead
+    tr
+      th(colspan="2")
+        .row
+          .col-xs-10
+            | Job Manager Output
+          .col-xs-1.text-right
+            a(ng-click="reloadData()" class="show-pointer")
+              i.fa.fa-refresh
+          .col-xs-1.text-left
+            a(href="jobmanager/stdout")
+              i.fa.fa-download
+
+  tbody
+    tr
+      td(colspan="2")
+        pre
+          | {{jobmanager.stdout}}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/03579bb5/flink-runtime-web/web-dashboard/app/scripts/index.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/index.coffee b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
index 0fc4cb8..af1aa3a 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/index.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
@@ -273,12 +273,14 @@ angular.module('flinkApp', ['ui.router', 'angularMoment'])
     views:
       details:
         templateUrl: "partials/jobmanager/stdout.html"
+        controller: 'JobManagerStdoutController'
 
-  .state "jobmanager.logfile",
-    url: "/logfile"
+  .state "jobmanager.log",
+    url: "/log"
     views:
       details:
-        templateUrl: "partials/jobmanager/logfile.html"
+        templateUrl: "partials/jobmanager/log.html"
+        controller: 'JobManagerLogsController'
 
   $urlRouterProvider.otherwise "/overview"
 

http://git-wip-us.apache.org/repos/asf/flink/blob/03579bb5/flink-runtime-web/web-dashboard/app/scripts/modules/jobmanager/jobmanager.ctrl.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobmanager/jobmanager.ctrl.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobmanager/jobmanager.ctrl.coffee
index b3534c2..eee3b36 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobmanager/jobmanager.ctrl.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobmanager/jobmanager.ctrl.coffee
@@ -23,3 +23,23 @@ angular.module('flinkApp')
     if !$scope.jobmanager?
       $scope.jobmanager = {}
     $scope.jobmanager['config'] = data
+
+.controller 'JobManagerLogsController', ($scope, JobManagerLogsService) ->
+  JobManagerLogsService.loadLogs().then (data) ->
+    if !$scope.jobmanager?
+      $scope.jobmanager = {}
+    $scope.jobmanager['log'] = data
+
+  $scope.reloadData = () ->
+    JobManagerLogsService.loadLogs().then (data) ->
+      $scope.jobmanager['log'] = data
+
+.controller 'JobManagerStdoutController', ($scope, JobManagerStdoutService) ->
+  JobManagerStdoutService.loadStdout().then (data) ->
+    if !$scope.jobmanager?
+      $scope.jobmanager = {}
+    $scope.jobmanager['stdout'] = data
+
+  $scope.reloadData = () ->
+    JobManagerStdoutService.loadStdout().then (data) ->
+      $scope.jobmanager['stdout'] = data

http://git-wip-us.apache.org/repos/asf/flink/blob/03579bb5/flink-runtime-web/web-dashboard/app/scripts/modules/jobmanager/jobmanager.svc.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobmanager/jobmanager.svc.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobmanager/jobmanager.svc.coffee
index e87565d..e861143 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobmanager/jobmanager.svc.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobmanager/jobmanager.svc.coffee
@@ -32,3 +32,33 @@ angular.module('flinkApp')
     deferred.promise
 
   @
+
+.service 'JobManagerLogsService', ($http, flinkConfig, $q) ->
+  logs = {}
+
+  @loadLogs = ->
+    deferred = $q.defer()
+
+    $http.get("jobmanager/log")
+    .success (data, status, headers, config) ->
+      logs = data
+      deferred.resolve(data)
+
+    deferred.promise
+
+  @
+
+.service 'JobManagerStdoutService', ($http, flinkConfig, $q) ->
+  stdout = {}
+
+  @loadStdout = ->
+    deferred = $q.defer()
+
+    $http.get("jobmanager/stdout")
+    .success (data, status, headers, config) ->
+      stdout = data
+      deferred.resolve(data)
+
+    deferred.promise
+
+  @

http://git-wip-us.apache.org/repos/asf/flink/blob/03579bb5/flink-runtime-web/web-dashboard/app/styles/index.styl
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/styles/index.styl b/flink-runtime-web/web-dashboard/app/styles/index.styl
index d2702e7..25e1170 100644
--- a/flink-runtime-web/web-dashboard/app/styles/index.styl
+++ b/flink-runtime-web/web-dashboard/app/styles/index.styl
@@ -420,3 +420,8 @@ livechart
 #non-heap-mem
   background-color: #90ed7d
 
+a.show-pointer
+  cursor: pointer
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/03579bb5/flink-runtime-web/web-dashboard/web/css/index.css
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/css/index.css b/flink-runtime-web/web-dashboard/web/css/index.css
index 394e640..43f7d98 100644
--- a/flink-runtime-web/web-dashboard/web/css/index.css
+++ b/flink-runtime-web/web-dashboard/web/css/index.css
@@ -591,3 +591,6 @@ svg.graph .node-label {
 #non-heap-mem {
   background-color: #90ed7d;
 }
+a.show-pointer {
+  cursor: pointer;
+}