You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/16 19:48:00 UTC

[jira] [Commented] (FLINK-6387) Flink UI support access log

    [ https://issues.apache.org/jira/browse/FLINK-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16652344#comment-16652344 ] 

ASF GitHub Bot commented on FLINK-6387:
---------------------------------------

zentol closed pull request #3777: [FLINK-6387] [webfrontend]Flink UI support access log
URL: https://github.com/apache/flink/pull/3777
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index d129405e06a..153423bbbd1 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -168,6 +168,10 @@
 		key("jobmanager.archive.fs.dir")
 			.noDefaultValue();
 
+	/** Config parameter indicating whether enable the web access log. */
+	public static final ConfigOption<Boolean> JOB_MANAGER_WEB_ACCESSLOG_ENABLE =
+		key("jobmanager.web.accesslog.enable")
+			.defaultValue(false);
 	// ---------------------------------------------------------------------------------------------
 
 	private JobManagerOptions() {
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
index d14b7a22b1d..ddb6da80ab7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
@@ -33,6 +33,7 @@
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.HttpContent;
 import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpHeaders.Names;
 import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpObject;
 import io.netty.handler.codec.http.HttpRequest;
@@ -50,10 +51,13 @@
 import io.netty.handler.codec.http.multipart.InterfaceHttpData.HttpDataType;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.util.Date;
 import java.util.UUID;
 
 /**
@@ -66,19 +70,22 @@
 public class HttpRequestHandler extends SimpleChannelInboundHandler<HttpObject> {
 
 	private static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
+	private static final Logger LOG = LoggerFactory.getLogger(HttpRequestHandler.class);
 
 	/** A decoder factory that always stores POST chunks on disk */
 	private static final HttpDataFactory DATA_FACTORY = new DefaultHttpDataFactory(true);
 
 	private final File tmpDir;
+	private final boolean enableAccesslog;
 
 	private HttpRequest currentRequest;
 
 	private HttpPostRequestDecoder currentDecoder;
 	private String currentRequestPath;
 
-	public HttpRequestHandler(File tmpDir) {
+	public HttpRequestHandler(File tmpDir, boolean enableAccesslog) {
 		this.tmpDir = tmpDir;
+		this.enableAccesslog = enableAccesslog;
 	}
 
 	@Override
@@ -100,6 +107,10 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
 					currentDecoder = null;
 				}
 
+				if (enableAccesslog) {
+					logAccess(ctx, currentRequest);
+				}
+
 				if (currentRequest.getMethod() == HttpMethod.GET || currentRequest.getMethod() == HttpMethod.DELETE) {
 					// directly delegate to the router
 					ctx.fireChannelRead(currentRequest);
@@ -183,4 +194,28 @@ else if (currentDecoder != null && msg instanceof HttpContent) {
 			}
 		}
 	}
+
+  /**
+   * Record the access log if enable configure of
+   * {@link org.apache.flink.configuration.JobManagerOptions#JOB_MANAGER_WEB_ACCESSLOG_ENABLE}.
+   * record format:
+   * remote_addr - [time_local] "request_method URI protocolVersion" "http_referer" "http_user_agent"
+   */
+	private void logAccess(ChannelHandlerContext ctx, HttpRequest req) {
+		HttpHeaders headers = req.headers();
+		if (headers != null) {
+			LOG.info("%s - [%s] \"%s %s %s\" \"%s\" \"%s\"",
+				ctx.channel().remoteAddress(), new Date().toString(), req.getMethod().name(),
+				req.getUri(), req.getProtocolVersion().text(), getHeader(Names.REFERER, headers),
+				getHeader(Names.USER_AGENT, headers));
+		}
+	}
+
+	private String getHeader(String key, HttpHeaders headers) {
+		if (headers.contains(key) && headers.get(key) != null) {
+			return headers.get(key);
+		} else {
+			return "-";
+		}
+	}
 }
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
index 19ec08ad316..82ccfe1c971 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
@@ -30,6 +30,7 @@
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.webmonitor.HttpRequestHandler;
 import org.apache.flink.runtime.webmonitor.PipelineErrorHandler;
@@ -65,6 +66,8 @@ public WebFrontendBootstrap(
 		this.uploadDir = Preconditions.checkNotNull(directory);
 		this.serverSSLContext = sslContext;
 
+		final boolean enableAccesslog = config.getBoolean(JobManagerOptions.JOB_MANAGER_WEB_ACCESSLOG_ENABLE);
+
 		ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
 
 			@Override
@@ -82,7 +85,7 @@ protected void initChannel(SocketChannel ch) {
 				ch.pipeline()
 					.addLast(new HttpServerCodec())
 					.addLast(new ChunkedWriteHandler())
-					.addLast(new HttpRequestHandler(uploadDir))
+					.addLast(new HttpRequestHandler(uploadDir, enableAccesslog))
 					.addLast(handler.name(), handler)
 					.addLast(new PipelineErrorHandler(WebFrontendBootstrap.this.log));
 			}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Flink UI support access log
> ---------------------------
>
>                 Key: FLINK-6387
>                 URL: https://issues.apache.org/jira/browse/FLINK-6387
>             Project: Flink
>          Issue Type: Improvement
>          Components: Webfrontend
>            Reporter: shijinkui
>            Assignee: shijinkui
>            Priority: Major
>
> Record the use request to the access log. Append use access to the log file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)