You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2016/03/14 19:08:13 UTC

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/1790

    [FLINK-2732] Display TM logs in Dashboard

    This PR allows the TM logs to be displayed in the Dashboard.
    
    The page displaying information about a single TaskManager now has two additional tabs (previously only metrics), log and stdout, similarly to the JobManager page.
    
    When this tabs are accessed, a TaskManagerLogHandler is called that roughly does the following:
    * get the TM ID from the REST API call
    * ask JM for the TM gateway belonging to this ID
    * tell the TM to upload his logs to the Blob store, and to return the BlobKey
    * send the BlobKey to the JM , asking for the path to the log file
    * display file in the Dashboard
    
    Should the BlobService not be available (e.g when the start-local script is used), a proper message is displayed instead of the log.
    
    At all times only a single version of the TM log/out file exists in the Blob store; whenever the log is requested the old entry is deleted, if the file has changed. This is handled by the TaskManagerLogHandler. To prevent race-conditions when spamming the refresh button, only a single transfer can be active for each TM. Should the user hit refresh while a transfer is in progress, "loading..." will be displayed instead of the log.
    
    When the TM uploads it's log a separate Thread is created to do that, so that the TM isn't blocked for a while if the file is too big. Similarly, the handler doesn't block while waiting the the TM upload, and instead uses a FutureCallback to handle the finished upload.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 2732_taskmanager_logs

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1790.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1790
    
----
commit 3ffe3d7fc310559a3deaf223f2dd78bc05b670b2
Author: zentol <ch...@apache.org>
Date:   2016-03-14T17:55:42Z

    [FLINK-2732] Display TM logs in Dashboard

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56172503
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    +	private final ExecutionContextExecutor executor;
    +
    +	private String localJobManagerAddress;
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressPromise,
    +		FiniteDuration timeout,
    +		boolean serveLogFile) throws IOException {
    +
    +		this(retriever, executor, localJobManagerAddressPromise, timeout, DEFAULT_LOGGER, serveLogFile);
    +	}
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressFuture,
    +		FiniteDuration timeout,
    +		Logger logger, boolean serveLogFile) throws IOException {
    +
    +		this.retriever = checkNotNull(retriever);
    +		this.executor = checkNotNull(executor);
    +		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
    +		this.timeout = checkNotNull(timeout);
    +		this.logger = checkNotNull(logger);
    +		this.serveLogFile = serveLogFile;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Responses to requests
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
    +		if (localJobManagerAddressFuture.isCompleted()) {
    +			if (localJobManagerAddress == null) {
    +				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
    +			}
    +
    +			final HttpRequest request = routed.request();
    +
    +			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
    +
    +			if (jobManager.isDefined()) {
    +				// Redirect to leader if necessary
    +				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
    +					localJobManagerAddress, jobManager.get());
    +
    +				if (redirectAddress != null) {
    +					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, "");
    +					KeepAliveWrite.flush(ctx, routed.request(), redirect);
    +				} else {
    +					respondAsLeader(ctx, request, routed.pathParams(), jobManager.get()._1());
    +				}
    +			} else {
    +				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +			}
    +		} else {
    +			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +		}
    +	}
    +
    +	/**
    +	 * Response when running with leading JobManager.
    +	 */
    +	private void respondAsLeader(final ChannelHandlerContext ctx, final HttpRequest request, final Map<String, String> pathParams, final ActorGateway jobManager) throws Exception {
    +		final String taskManagerID = pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
    +
    +		boolean fetch;
    +		synchronized (lastRequestCompleted) {
    +			if (!lastRequestCompleted.containsKey(taskManagerID)) {
    +				lastRequestCompleted.put(taskManagerID, true);
    +			}
    +			fetch = lastRequestCompleted.get(taskManagerID);
    +			lastRequestCompleted.put(taskManagerID, false);
    +		}
    +
    +		//fetch taskmanager logs if no other process is currently doing it
    +		if (fetch) {
    +			//get taskmanager gateway
    +			InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
    +			Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
    +			JobManagerMessages.TaskManagerInstance instance = (JobManagerMessages.TaskManagerInstance) Await.result(future, timeout);
    +			Instance taskManager = instance.instance().get();
    +
    +			Future<Object> isBlobServiceDefined = taskManager.getActorGateway().ask(TaskManagerMessages.getIsBlobServiceDefined(), timeout);
    +
    +			if (!(Boolean) Await.result(isBlobServiceDefined, timeout)) {
    +				display(ctx, request, "BlobService unavailable, cannot upload TaskManager logs.");
    +				return;
    +			}
    +
    +			//send log request to taskmanager
    +			Future<Object> blobKeyFuture = taskManager.getActorGateway().ask(serveLogFile ? TaskManagerMessages.getRequestTaskManagerLog() : TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
    +			blobKeyFuture.onComplete(new OnComplete<Object>() {
    +				@Override
    +				public void onComplete(Throwable failure, Object success) throws Throwable {
    +					try {
    +						BlobKey blobKey = (BlobKey) success;
    +
    +						//delete previous log file, if it is different than the current one
    +						if ((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).containsKey(taskManagerID)) {
    +							if (!blobKey.equals((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).get(taskManagerID))) {
    +								jobManager.tell(JobManagerMessages.getDeleteTaskManagerLog((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).remove(taskManagerID)));
    +								(serveLogFile ? lastSubmittedLog : lastSubmittedStdout).put(taskManagerID, blobKey);
    +							}
    +						} else {
    +							(serveLogFile ? lastSubmittedLog : lastSubmittedStdout).put(taskManagerID, blobKey);
    +						}
    +
    +						//send blobkey to jobmanager
    +						Future<Object> logPathFuture = jobManager.ask(JobManagerMessages.getRequestTaskManagerLog(blobKey), timeout);
    +						String filePath = (String) Await.result(logPathFuture, timeout);
    +
    +						File file = new File(filePath);
    +						final RandomAccessFile raf;
    +						try {
    +							raf = new RandomAccessFile(file, "r");
    +						} catch (FileNotFoundException e) {
    +							sendError(ctx, NOT_FOUND);
    +							return;
    +						}
    +						long fileLength = raf.length();
    +
    +						HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
    +						setContentTypeHeader(response, file);
    +
    +						if (HttpHeaders.isKeepAlive(request)) {
    +							response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
    +						}
    +						HttpHeaders.setContentLength(response, fileLength);
    +
    +						// write the initial line and the header.
    +						ctx.write(response);
    +
    +						// write the content.
    +						ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise())
    +							.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
    +								@Override
    +								public void operationComplete(io.netty.util.concurrent.Future<? super Void> future) throws Exception {
    +									synchronized (lastRequestCompleted) {
    +										lastRequestCompleted.put(taskManagerID, true);
    --- End diff --
    
    What if a failure occurs before this listener is added. Wouldn't that mean that the `lastRequestCompleted` entry for this `taskManagerID` would remain `false` for all eternity?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56178145
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    +	private final ExecutionContextExecutor executor;
    +
    +	private String localJobManagerAddress;
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressPromise,
    +		FiniteDuration timeout,
    +		boolean serveLogFile) throws IOException {
    +
    +		this(retriever, executor, localJobManagerAddressPromise, timeout, DEFAULT_LOGGER, serveLogFile);
    +	}
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressFuture,
    +		FiniteDuration timeout,
    +		Logger logger, boolean serveLogFile) throws IOException {
    +
    +		this.retriever = checkNotNull(retriever);
    +		this.executor = checkNotNull(executor);
    +		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
    +		this.timeout = checkNotNull(timeout);
    +		this.logger = checkNotNull(logger);
    +		this.serveLogFile = serveLogFile;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Responses to requests
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
    +		if (localJobManagerAddressFuture.isCompleted()) {
    +			if (localJobManagerAddress == null) {
    +				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
    +			}
    +
    +			final HttpRequest request = routed.request();
    +
    +			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
    +
    +			if (jobManager.isDefined()) {
    +				// Redirect to leader if necessary
    +				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
    +					localJobManagerAddress, jobManager.get());
    +
    +				if (redirectAddress != null) {
    +					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, "");
    +					KeepAliveWrite.flush(ctx, routed.request(), redirect);
    +				} else {
    +					respondAsLeader(ctx, request, routed.pathParams(), jobManager.get()._1());
    +				}
    +			} else {
    +				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +			}
    +		} else {
    +			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +		}
    +	}
    +
    +	/**
    +	 * Response when running with leading JobManager.
    +	 */
    +	private void respondAsLeader(final ChannelHandlerContext ctx, final HttpRequest request, final Map<String, String> pathParams, final ActorGateway jobManager) throws Exception {
    +		final String taskManagerID = pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
    +
    +		boolean fetch;
    +		synchronized (lastRequestCompleted) {
    +			if (!lastRequestCompleted.containsKey(taskManagerID)) {
    +				lastRequestCompleted.put(taskManagerID, true);
    +			}
    +			fetch = lastRequestCompleted.get(taskManagerID);
    +			lastRequestCompleted.put(taskManagerID, false);
    +		}
    +
    +		//fetch taskmanager logs if no other process is currently doing it
    +		if (fetch) {
    +			//get taskmanager gateway
    +			InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
    +			Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
    +			JobManagerMessages.TaskManagerInstance instance = (JobManagerMessages.TaskManagerInstance) Await.result(future, timeout);
    +			Instance taskManager = instance.instance().get();
    +
    +			Future<Object> isBlobServiceDefined = taskManager.getActorGateway().ask(TaskManagerMessages.getIsBlobServiceDefined(), timeout);
    +
    +			if (!(Boolean) Await.result(isBlobServiceDefined, timeout)) {
    +				display(ctx, request, "BlobService unavailable, cannot upload TaskManager logs.");
    +				return;
    +			}
    +
    +			//send log request to taskmanager
    +			Future<Object> blobKeyFuture = taskManager.getActorGateway().ask(serveLogFile ? TaskManagerMessages.getRequestTaskManagerLog() : TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
    +			blobKeyFuture.onComplete(new OnComplete<Object>() {
    +				@Override
    +				public void onComplete(Throwable failure, Object success) throws Throwable {
    +					try {
    +						BlobKey blobKey = (BlobKey) success;
    --- End diff --
    
    if success is null, is failure always != null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56309084
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    --- End diff --
    
    going for the RuntimeMonitorhandlerBase approach.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56160952
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -781,6 +787,32 @@ class TaskManager(
         }
       }
     
    +  private def handleRequestTaskManagerLog
    +  (sender: ActorRef, requestType: LogTypeRequest, jobManager: ActorRef):
    +  Unit = {
    +    if (blobService.isDefined && jobManager.path.address.host.isDefined) {
    +      //create new Thread to upload log, to not block the TM for too long
    +      new Thread() {
    +        override def run(): Unit = {
    +          val jmHost = jobManager.path.address.host.get
    +          val port = blobService.get.getPort
    --- End diff --
    
    Maybe we should extend the `BlobService` to return the `serverAddress` and not only the port.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-197258802
  
    yeah...the changes in the jobmanager are fairly easy, albeit hacky and ugly. the FlinkMiniCluster however is pain.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56188957
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -781,6 +787,32 @@ class TaskManager(
         }
       }
     
    +  private def handleRequestTaskManagerLog
    +  (sender: ActorRef, requestType: LogTypeRequest, jobManager: ActorRef):
    +  Unit = {
    +    if (blobService.isDefined && jobManager.path.address.host.isDefined) {
    +      //create new Thread to upload log, to not block the TM for too long
    +      new Thread() {
    +        override def run(): Unit = {
    +          val jmHost = jobManager.path.address.host.get
    +          val port = blobService.get.getPort
    +          val logFilePath = System.getProperty("log.file");
    --- End diff --
    
    and how would we pass that value to the logger?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56181881
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    --- End diff --
    
    i think it's overkill for a binary option, and won't make the code more readable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-197322838
  
    second batch of changes coming in.
    * TaskManager checks whether blobService is defined
    * new class: RuntimeMonitorHandlerBase
     * RuntimeMonitorHandler/TaskManagerLogHandler extend RuntimeMonnitorHandlerBase
     * removes some redundant code
    * TaskManagerLogHandler uses it's own BlobCache
     * reverted addition of messages to JobManagerMessages
     * reverted addition of methods to LibraryCacheManager
     * new method in BlobCache, to delete a Blob both locally and on the server
    
    Issues left:
    * tests


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56176392
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    --- End diff --
    
    What do you need for that? I would assume that everything is already there.
    
    What if we implement a different version of the `RequestHandler` which receives the `ChannelHandlerContext`. Or we create a `RuntimeMonitorHandlerBase` which implements `channelRead0` but not `respondLeader`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56184730
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    +	private final ExecutionContextExecutor executor;
    +
    +	private String localJobManagerAddress;
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressPromise,
    +		FiniteDuration timeout,
    +		boolean serveLogFile) throws IOException {
    +
    +		this(retriever, executor, localJobManagerAddressPromise, timeout, DEFAULT_LOGGER, serveLogFile);
    +	}
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressFuture,
    +		FiniteDuration timeout,
    +		Logger logger, boolean serveLogFile) throws IOException {
    +
    +		this.retriever = checkNotNull(retriever);
    +		this.executor = checkNotNull(executor);
    +		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
    +		this.timeout = checkNotNull(timeout);
    +		this.logger = checkNotNull(logger);
    +		this.serveLogFile = serveLogFile;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Responses to requests
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
    +		if (localJobManagerAddressFuture.isCompleted()) {
    +			if (localJobManagerAddress == null) {
    +				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
    +			}
    +
    +			final HttpRequest request = routed.request();
    +
    +			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
    +
    +			if (jobManager.isDefined()) {
    +				// Redirect to leader if necessary
    +				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
    +					localJobManagerAddress, jobManager.get());
    +
    +				if (redirectAddress != null) {
    +					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, "");
    +					KeepAliveWrite.flush(ctx, routed.request(), redirect);
    +				} else {
    +					respondAsLeader(ctx, request, routed.pathParams(), jobManager.get()._1());
    +				}
    +			} else {
    +				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +			}
    +		} else {
    +			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +		}
    +	}
    +
    +	/**
    +	 * Response when running with leading JobManager.
    +	 */
    +	private void respondAsLeader(final ChannelHandlerContext ctx, final HttpRequest request, final Map<String, String> pathParams, final ActorGateway jobManager) throws Exception {
    +		final String taskManagerID = pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
    +
    +		boolean fetch;
    +		synchronized (lastRequestCompleted) {
    +			if (!lastRequestCompleted.containsKey(taskManagerID)) {
    +				lastRequestCompleted.put(taskManagerID, true);
    +			}
    +			fetch = lastRequestCompleted.get(taskManagerID);
    +			lastRequestCompleted.put(taskManagerID, false);
    +		}
    +
    +		//fetch taskmanager logs if no other process is currently doing it
    +		if (fetch) {
    +			//get taskmanager gateway
    +			InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
    +			Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
    +			JobManagerMessages.TaskManagerInstance instance = (JobManagerMessages.TaskManagerInstance) Await.result(future, timeout);
    --- End diff --
    
    No simply use future composition: `map`, `zip`, etc. That way you can simply define operations for future values. See also http://doc.akka.io/docs/akka/snapshot/scala/futures.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56308139
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    --- End diff --
    
    only problem being that StaticFileServerhandler has a File constructor argument...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-196804278
  
    I think we should an automated test for the change.
    The `WebFrontendITCase` or one of the YARN tests is probably the right place for such a test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56174087
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    --- End diff --
    
    The RequestHandler expects a String to be returned. Do you think it's a good idea to do that since it means allocating a string containing the full log?
    
    In addition, i wanted to reuse the code that is used to serve the jobmanager files, as i can't assess in general whether the Requesthandler exposes everything we need; can you?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56156521
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -781,6 +787,32 @@ class TaskManager(
         }
       }
     
    +  private def handleRequestTaskManagerLog
    +  (sender: ActorRef, requestType: LogTypeRequest, jobManager: ActorRef):
    +  Unit = {
    +    if (blobService.isDefined && jobManager.path.address.host.isDefined) {
    +      //create new Thread to upload log, to not block the TM for too long
    +      new Thread() {
    +        override def run(): Unit = {
    --- End diff --
    
    Exceptions thrown from this run() method are not properly catched.
    Any failures in the thread will be written to system out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56171870
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    +	private final ExecutionContextExecutor executor;
    +
    +	private String localJobManagerAddress;
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressPromise,
    +		FiniteDuration timeout,
    +		boolean serveLogFile) throws IOException {
    +
    +		this(retriever, executor, localJobManagerAddressPromise, timeout, DEFAULT_LOGGER, serveLogFile);
    +	}
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressFuture,
    +		FiniteDuration timeout,
    +		Logger logger, boolean serveLogFile) throws IOException {
    +
    +		this.retriever = checkNotNull(retriever);
    +		this.executor = checkNotNull(executor);
    +		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
    +		this.timeout = checkNotNull(timeout);
    +		this.logger = checkNotNull(logger);
    +		this.serveLogFile = serveLogFile;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Responses to requests
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
    +		if (localJobManagerAddressFuture.isCompleted()) {
    +			if (localJobManagerAddress == null) {
    +				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
    +			}
    +
    +			final HttpRequest request = routed.request();
    +
    +			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
    +
    +			if (jobManager.isDefined()) {
    +				// Redirect to leader if necessary
    +				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
    +					localJobManagerAddress, jobManager.get());
    +
    +				if (redirectAddress != null) {
    +					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, "");
    +					KeepAliveWrite.flush(ctx, routed.request(), redirect);
    +				} else {
    +					respondAsLeader(ctx, request, routed.pathParams(), jobManager.get()._1());
    +				}
    +			} else {
    +				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +			}
    +		} else {
    +			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +		}
    +	}
    +
    +	/**
    +	 * Response when running with leading JobManager.
    +	 */
    +	private void respondAsLeader(final ChannelHandlerContext ctx, final HttpRequest request, final Map<String, String> pathParams, final ActorGateway jobManager) throws Exception {
    +		final String taskManagerID = pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
    +
    +		boolean fetch;
    +		synchronized (lastRequestCompleted) {
    +			if (!lastRequestCompleted.containsKey(taskManagerID)) {
    +				lastRequestCompleted.put(taskManagerID, true);
    +			}
    +			fetch = lastRequestCompleted.get(taskManagerID);
    +			lastRequestCompleted.put(taskManagerID, false);
    +		}
    +
    +		//fetch taskmanager logs if no other process is currently doing it
    +		if (fetch) {
    +			//get taskmanager gateway
    +			InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
    +			Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
    +			JobManagerMessages.TaskManagerInstance instance = (JobManagerMessages.TaskManagerInstance) Await.result(future, timeout);
    +			Instance taskManager = instance.instance().get();
    +
    +			Future<Object> isBlobServiceDefined = taskManager.getActorGateway().ask(TaskManagerMessages.getIsBlobServiceDefined(), timeout);
    +
    +			if (!(Boolean) Await.result(isBlobServiceDefined, timeout)) {
    --- End diff --
    
    Same problem here with blocking operation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56157971
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -781,6 +787,32 @@ class TaskManager(
         }
       }
     
    +  private def handleRequestTaskManagerLog
    +  (sender: ActorRef, requestType: LogTypeRequest, jobManager: ActorRef):
    +  Unit = {
    +    if (blobService.isDefined && jobManager.path.address.host.isDefined) {
    +      //create new Thread to upload log, to not block the TM for too long
    +      new Thread() {
    +        override def run(): Unit = {
    +          val jmHost = jobManager.path.address.host.get
    +          val port = blobService.get.getPort
    +          val logFilePath = System.getProperty("log.file");
    --- End diff --
    
    Okay. It seems that the JobManager is trying to get it from the property. if that is not set, it tries to get it from the config and then it fails.
    
    I think we should do something similar for the TaskManager as well. This allows users running into issues with this to configure a path.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56161389
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -310,6 +310,12 @@ class TaskManager(
     
         case FatalError(message, cause) =>
           killTaskManagerFatal(message, cause)
    +
    +    case RequestTaskManagerLog(requestType : LogTypeRequest) =>
    +      handleRequestTaskManagerLog(sender(), requestType, currentJobManager.get)
    --- End diff --
    
    Why not simply returning a `Future[BlobKey]` from the `handleRequestTaskManagerLog` which is simply piped to the `sender`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-211286298
  
    Merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56176643
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -310,6 +310,12 @@ class TaskManager(
     
         case FatalError(message, cause) =>
           killTaskManagerFatal(message, cause)
    +
    +    case RequestTaskManagerLog(requestType : LogTypeRequest) =>
    +      handleRequestTaskManagerLog(sender(), requestType, currentJobManager.get)
    +
    +    case IsBlobServiceDefined =>
    +      sender() ! (blobService.isDefined && currentJobManager.get.path.address.host.isDefined)
    --- End diff --
    
    What is the value of `path` in the local case? Maybe we should then extend the `BlobService` to simply give back a `BlobClient` which has the right connection address.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-198348395
  
    It also reduces the number of changed classes, since no new messsages to the jobmanager had to be added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56173641
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---
    @@ -234,7 +235,9 @@ public WebRuntimeMonitor(
     			.GET("/jobs/:jobid/checkpoints", handler(new JobCheckpointsHandler(currentGraphs)))
     
     			.GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
    -			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
    +			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
    +			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/log", new TaskManagerLogHandler(retriever, actorSystem.dispatcher(), jobManagerAddressPromise.future(), timeout, true))
    --- End diff --
    
    I think we shouldn't use the dispatcher of the `ActorSystem` as the `ExecutorService` here. The `ActorSystem` is usually the `ActorSystem` of the JM and we shouldn't put load on its `ThreadPool`. I would rather use a dedicated `ThreadPool` for the execution of the futures.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56156936
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala ---
    @@ -460,6 +461,10 @@ object JobManagerMessages {
         */
       case class DisposeSavepointFailure(cause: Throwable)
     
    +  case class RequestTaskManagerLog(blobKey: BlobKey)
    --- End diff --
    
    ScalaDocs missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-198348202
  
    the files are currently loaded twice. First, the logs are copied from the Taskmanager to the JobManager blob store. They are then copied again from the JobManager blob store to the TaskmanagerLogHandler BlobCache. This was @tillrohrmann idea, and is meant to make the handler easier to change when the Dashboard and JobManager are separated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56183974
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    +	private final ExecutionContextExecutor executor;
    +
    +	private String localJobManagerAddress;
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressPromise,
    +		FiniteDuration timeout,
    +		boolean serveLogFile) throws IOException {
    +
    +		this(retriever, executor, localJobManagerAddressPromise, timeout, DEFAULT_LOGGER, serveLogFile);
    +	}
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressFuture,
    +		FiniteDuration timeout,
    +		Logger logger, boolean serveLogFile) throws IOException {
    +
    +		this.retriever = checkNotNull(retriever);
    +		this.executor = checkNotNull(executor);
    +		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
    +		this.timeout = checkNotNull(timeout);
    +		this.logger = checkNotNull(logger);
    +		this.serveLogFile = serveLogFile;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Responses to requests
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
    +		if (localJobManagerAddressFuture.isCompleted()) {
    +			if (localJobManagerAddress == null) {
    +				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
    +			}
    +
    +			final HttpRequest request = routed.request();
    +
    +			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
    +
    +			if (jobManager.isDefined()) {
    +				// Redirect to leader if necessary
    +				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
    +					localJobManagerAddress, jobManager.get());
    +
    +				if (redirectAddress != null) {
    +					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, "");
    +					KeepAliveWrite.flush(ctx, routed.request(), redirect);
    +				} else {
    +					respondAsLeader(ctx, request, routed.pathParams(), jobManager.get()._1());
    +				}
    +			} else {
    +				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +			}
    +		} else {
    +			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +		}
    +	}
    +
    +	/**
    +	 * Response when running with leading JobManager.
    +	 */
    +	private void respondAsLeader(final ChannelHandlerContext ctx, final HttpRequest request, final Map<String, String> pathParams, final ActorGateway jobManager) throws Exception {
    +		final String taskManagerID = pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
    +
    +		boolean fetch;
    +		synchronized (lastRequestCompleted) {
    +			if (!lastRequestCompleted.containsKey(taskManagerID)) {
    +				lastRequestCompleted.put(taskManagerID, true);
    +			}
    +			fetch = lastRequestCompleted.get(taskManagerID);
    +			lastRequestCompleted.put(taskManagerID, false);
    +		}
    +
    +		//fetch taskmanager logs if no other process is currently doing it
    +		if (fetch) {
    +			//get taskmanager gateway
    +			InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
    +			Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
    +			JobManagerMessages.TaskManagerInstance instance = (JobManagerMessages.TaskManagerInstance) Await.result(future, timeout);
    --- End diff --
    
    so a triple nested future then?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56155553
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    --- End diff --
    
    Good attribution :+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56173268
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    +	private final ExecutionContextExecutor executor;
    +
    +	private String localJobManagerAddress;
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressPromise,
    +		FiniteDuration timeout,
    +		boolean serveLogFile) throws IOException {
    +
    +		this(retriever, executor, localJobManagerAddressPromise, timeout, DEFAULT_LOGGER, serveLogFile);
    +	}
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressFuture,
    +		FiniteDuration timeout,
    +		Logger logger, boolean serveLogFile) throws IOException {
    +
    +		this.retriever = checkNotNull(retriever);
    +		this.executor = checkNotNull(executor);
    +		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
    +		this.timeout = checkNotNull(timeout);
    +		this.logger = checkNotNull(logger);
    +		this.serveLogFile = serveLogFile;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Responses to requests
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
    +		if (localJobManagerAddressFuture.isCompleted()) {
    +			if (localJobManagerAddress == null) {
    +				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
    +			}
    +
    +			final HttpRequest request = routed.request();
    +
    +			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
    +
    +			if (jobManager.isDefined()) {
    +				// Redirect to leader if necessary
    +				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
    +					localJobManagerAddress, jobManager.get());
    +
    +				if (redirectAddress != null) {
    +					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, "");
    +					KeepAliveWrite.flush(ctx, routed.request(), redirect);
    +				} else {
    +					respondAsLeader(ctx, request, routed.pathParams(), jobManager.get()._1());
    +				}
    +			} else {
    +				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +			}
    +		} else {
    +			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +		}
    +	}
    +
    +	/**
    +	 * Response when running with leading JobManager.
    +	 */
    +	private void respondAsLeader(final ChannelHandlerContext ctx, final HttpRequest request, final Map<String, String> pathParams, final ActorGateway jobManager) throws Exception {
    +		final String taskManagerID = pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
    +
    +		boolean fetch;
    +		synchronized (lastRequestCompleted) {
    +			if (!lastRequestCompleted.containsKey(taskManagerID)) {
    +				lastRequestCompleted.put(taskManagerID, true);
    +			}
    +			fetch = lastRequestCompleted.get(taskManagerID);
    +			lastRequestCompleted.put(taskManagerID, false);
    +		}
    +
    +		//fetch taskmanager logs if no other process is currently doing it
    +		if (fetch) {
    +			//get taskmanager gateway
    +			InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
    +			Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
    +			JobManagerMessages.TaskManagerInstance instance = (JobManagerMessages.TaskManagerInstance) Await.result(future, timeout);
    +			Instance taskManager = instance.instance().get();
    +
    +			Future<Object> isBlobServiceDefined = taskManager.getActorGateway().ask(TaskManagerMessages.getIsBlobServiceDefined(), timeout);
    +
    +			if (!(Boolean) Await.result(isBlobServiceDefined, timeout)) {
    +				display(ctx, request, "BlobService unavailable, cannot upload TaskManager logs.");
    +				return;
    +			}
    +
    +			//send log request to taskmanager
    +			Future<Object> blobKeyFuture = taskManager.getActorGateway().ask(serveLogFile ? TaskManagerMessages.getRequestTaskManagerLog() : TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
    +			blobKeyFuture.onComplete(new OnComplete<Object>() {
    +				@Override
    +				public void onComplete(Throwable failure, Object success) throws Throwable {
    +					try {
    +						BlobKey blobKey = (BlobKey) success;
    +
    +						//delete previous log file, if it is different than the current one
    +						if ((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).containsKey(taskManagerID)) {
    +							if (!blobKey.equals((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).get(taskManagerID))) {
    +								jobManager.tell(JobManagerMessages.getDeleteTaskManagerLog((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).remove(taskManagerID)));
    +								(serveLogFile ? lastSubmittedLog : lastSubmittedStdout).put(taskManagerID, blobKey);
    +							}
    +						} else {
    +							(serveLogFile ? lastSubmittedLog : lastSubmittedStdout).put(taskManagerID, blobKey);
    +						}
    +
    +						//send blobkey to jobmanager
    +						Future<Object> logPathFuture = jobManager.ask(JobManagerMessages.getRequestTaskManagerLog(blobKey), timeout);
    --- End diff --
    
    I don't like that we request the file path here. This more or less means that the handler must have access to the filesystem as the JM has. In the long run, we want to disentangle the `WebRuntimeMonitor` from the `JobManager` so that both components are independent of each other. I would rather prefer that the `WebRuntimeMonitor` has its own `BlobService` which it uses to request the log file given the `BlobKey` received from the TaskManager. In the first implementation, this `BlobService` could simply be the `BlobServer` of the JM. By doing it this way, we would create an abstraction which would make it easier to separate the `WebRuntimeMonitor` from the `JobManager`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56173858
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala ---
    @@ -114,6 +114,16 @@ object TaskManagerMessages {
         */
       case class JobManagerLeaderAddress(jobManagerAddress: String, leaderSessionID: UUID)
     
    +  sealed trait LogTypeRequest
    +
    +  case object LogFileRequest extends LogTypeRequest
    +
    +  case object StdOutFileRequest extends LogTypeRequest
    +
    +  case class RequestTaskManagerLog(requestType : LogTypeRequest)
    +
    +  case object IsBlobServiceDefined
    --- End diff --
    
    ScalaDocs still missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56171686
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    +	private final ExecutionContextExecutor executor;
    +
    +	private String localJobManagerAddress;
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressPromise,
    +		FiniteDuration timeout,
    +		boolean serveLogFile) throws IOException {
    +
    +		this(retriever, executor, localJobManagerAddressPromise, timeout, DEFAULT_LOGGER, serveLogFile);
    +	}
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressFuture,
    +		FiniteDuration timeout,
    +		Logger logger, boolean serveLogFile) throws IOException {
    +
    +		this.retriever = checkNotNull(retriever);
    +		this.executor = checkNotNull(executor);
    +		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
    +		this.timeout = checkNotNull(timeout);
    +		this.logger = checkNotNull(logger);
    +		this.serveLogFile = serveLogFile;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Responses to requests
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
    +		if (localJobManagerAddressFuture.isCompleted()) {
    +			if (localJobManagerAddress == null) {
    +				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
    +			}
    +
    +			final HttpRequest request = routed.request();
    +
    +			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
    +
    +			if (jobManager.isDefined()) {
    +				// Redirect to leader if necessary
    +				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
    +					localJobManagerAddress, jobManager.get());
    +
    +				if (redirectAddress != null) {
    +					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, "");
    +					KeepAliveWrite.flush(ctx, routed.request(), redirect);
    +				} else {
    +					respondAsLeader(ctx, request, routed.pathParams(), jobManager.get()._1());
    +				}
    +			} else {
    +				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +			}
    +		} else {
    +			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +		}
    +	}
    +
    +	/**
    +	 * Response when running with leading JobManager.
    +	 */
    +	private void respondAsLeader(final ChannelHandlerContext ctx, final HttpRequest request, final Map<String, String> pathParams, final ActorGateway jobManager) throws Exception {
    +		final String taskManagerID = pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
    +
    +		boolean fetch;
    +		synchronized (lastRequestCompleted) {
    +			if (!lastRequestCompleted.containsKey(taskManagerID)) {
    +				lastRequestCompleted.put(taskManagerID, true);
    +			}
    +			fetch = lastRequestCompleted.get(taskManagerID);
    +			lastRequestCompleted.put(taskManagerID, false);
    +		}
    --- End diff --
    
    I think it would be easier to use a `ConcurrentHashMap` in combination with `putIfAbsent` to guard against multiple concurrent requests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56180847
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -781,6 +787,32 @@ class TaskManager(
         }
       }
     
    +  private def handleRequestTaskManagerLog
    +  (sender: ActorRef, requestType: LogTypeRequest, jobManager: ActorRef):
    +  Unit = {
    +    if (blobService.isDefined && jobManager.path.address.host.isDefined) {
    --- End diff --
    
    Maybe we could extend the `BlobService` so that it can create a `BlobClient` for you. Then you don't have to worry about the address, because the `BlobService` should know it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1790


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-197266303
  
    @tillrohrmann i don't think we should do this now. We would have to change several method related to the creation of the JobManager (to return the BlobService), potentially hack around in the tests to get the BlobService from the active Jobmanager to feed it into the WebMonitor (which is created separately).
    
    The gains are negligible; it a 2 minute fix to change the Handler itself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-198358391
  
    We could also have passed the `BlobServer` of the `JobManager` to the `TaskManagerLogHandler` to avoid the extra copying. But this would mean that we would have to move the `WebRuntimeMonitor` creation in the `startJobManagerActors` method, which is not so nice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56161084
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -310,6 +310,12 @@ class TaskManager(
     
         case FatalError(message, cause) =>
           killTaskManagerFatal(message, cause)
    +
    +    case RequestTaskManagerLog(requestType : LogTypeRequest) =>
    +      handleRequestTaskManagerLog(sender(), requestType, currentJobManager.get)
    +
    +    case IsBlobServiceDefined() =>
    +      sender() ! (blobService.isDefined && currentJobManager.get.path.address.host.isDefined)
    --- End diff --
    
    if the `blobService` is defined, then it should have a valid `serverAddress` where the `BlobServer` is running.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-196528621
  
    Screenshots added:
    ![TaskManager main page](https://cloud.githubusercontent.com/assets/5725237/13760504/c270cafc-ea33-11e5-9ec8-3c86ceb8367a.png)
    ![lTaskManager log page](https://cloud.githubusercontent.com/assets/5725237/13760505/c271649e-ea33-11e5-945b-1ba2f4d70f9c.png)
    ![s3](https://cloud.githubusercontent.com/assets/5725237/13760572/1d124436-ea34-11e5-9098-60f634d752b4.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56157071
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala ---
    @@ -114,6 +114,16 @@ object TaskManagerMessages {
         */
       case class JobManagerLeaderAddress(jobManagerAddress: String, leaderSessionID: UUID)
     
    +  sealed trait LogTypeRequest
    +
    +  case object LogFileRequest extends LogTypeRequest
    +
    +  case object StdOutFileRequest extends LogTypeRequest
    +
    +  case class RequestTaskManagerLog(requestType : LogTypeRequest)
    +
    +  case class IsBlobServiceDefined()
    --- End diff --
    
    ScalaDocs missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56159993
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -781,6 +787,32 @@ class TaskManager(
         }
       }
     
    +  private def handleRequestTaskManagerLog
    +  (sender: ActorRef, requestType: LogTypeRequest, jobManager: ActorRef):
    +  Unit = {
    +    if (blobService.isDefined && jobManager.path.address.host.isDefined) {
    +      //create new Thread to upload log, to not block the TM for too long
    +      new Thread() {
    +        override def run(): Unit = {
    +          val jmHost = jobManager.path.address.host.get
    +          val port = blobService.get.getPort
    +          val logFilePath = System.getProperty("log.file");
    +          val file: File = requestType match {
    +            case LogFileRequest => new File(logFilePath);
    +            case StdOutFileRequest =>
    +              new File(logFilePath.substring(0, logFilePath.length - 4) + ".out");
    +          }
    +
    +          val client: BlobClient = new BlobClient(new InetSocketAddress(jmHost, port))
    +
    +          sender ! client.put(new FileInputStream(file))
    +        }
    +      }.run()
    --- End diff --
    
    This won't start a new thread but simply call the just overwritten `run` method. Use `start` instead. Furthermore, it is not encouraged to extend the Thread class. It is better to implement a `Runnable` or even better avoiding to deal with raw threads at all. I guess it would be easier to simply spawn a `Future` which is executed by a respective `ExecutionContext`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-198348409
  
    I see


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-206843425
  
    Test failures should be resolved.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56156930
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala ---
    @@ -460,6 +461,10 @@ object JobManagerMessages {
         */
       case class DisposeSavepointFailure(cause: Throwable)
     
    +  case class RequestTaskManagerLog(blobKey: BlobKey)
    +
    +  case class DeleteTaskManagerLog(blobKey: BlobKey)
    --- End diff --
    
    ScalaDocs missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56307836
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    --- End diff --
    
    a good solution would be to extend the StaticFileServerHandler, and modify some fields/methods to be protected instead of final. this way we could remove as much redundant code as possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56172532
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    +	private final ExecutionContextExecutor executor;
    +
    +	private String localJobManagerAddress;
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressPromise,
    +		FiniteDuration timeout,
    +		boolean serveLogFile) throws IOException {
    +
    +		this(retriever, executor, localJobManagerAddressPromise, timeout, DEFAULT_LOGGER, serveLogFile);
    +	}
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressFuture,
    +		FiniteDuration timeout,
    +		Logger logger, boolean serveLogFile) throws IOException {
    +
    +		this.retriever = checkNotNull(retriever);
    +		this.executor = checkNotNull(executor);
    +		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
    +		this.timeout = checkNotNull(timeout);
    +		this.logger = checkNotNull(logger);
    +		this.serveLogFile = serveLogFile;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Responses to requests
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
    +		if (localJobManagerAddressFuture.isCompleted()) {
    +			if (localJobManagerAddress == null) {
    +				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
    +			}
    +
    +			final HttpRequest request = routed.request();
    +
    +			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
    +
    +			if (jobManager.isDefined()) {
    +				// Redirect to leader if necessary
    +				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
    +					localJobManagerAddress, jobManager.get());
    +
    +				if (redirectAddress != null) {
    +					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, "");
    +					KeepAliveWrite.flush(ctx, routed.request(), redirect);
    +				} else {
    +					respondAsLeader(ctx, request, routed.pathParams(), jobManager.get()._1());
    +				}
    +			} else {
    +				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +			}
    +		} else {
    +			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +		}
    +	}
    +
    +	/**
    +	 * Response when running with leading JobManager.
    +	 */
    +	private void respondAsLeader(final ChannelHandlerContext ctx, final HttpRequest request, final Map<String, String> pathParams, final ActorGateway jobManager) throws Exception {
    +		final String taskManagerID = pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
    +
    +		boolean fetch;
    +		synchronized (lastRequestCompleted) {
    +			if (!lastRequestCompleted.containsKey(taskManagerID)) {
    +				lastRequestCompleted.put(taskManagerID, true);
    +			}
    +			fetch = lastRequestCompleted.get(taskManagerID);
    +			lastRequestCompleted.put(taskManagerID, false);
    +		}
    +
    +		//fetch taskmanager logs if no other process is currently doing it
    +		if (fetch) {
    +			//get taskmanager gateway
    +			InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
    +			Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
    +			JobManagerMessages.TaskManagerInstance instance = (JobManagerMessages.TaskManagerInstance) Await.result(future, timeout);
    +			Instance taskManager = instance.instance().get();
    +
    +			Future<Object> isBlobServiceDefined = taskManager.getActorGateway().ask(TaskManagerMessages.getIsBlobServiceDefined(), timeout);
    +
    +			if (!(Boolean) Await.result(isBlobServiceDefined, timeout)) {
    +				display(ctx, request, "BlobService unavailable, cannot upload TaskManager logs.");
    +				return;
    +			}
    +
    +			//send log request to taskmanager
    +			Future<Object> blobKeyFuture = taskManager.getActorGateway().ask(serveLogFile ? TaskManagerMessages.getRequestTaskManagerLog() : TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
    +			blobKeyFuture.onComplete(new OnComplete<Object>() {
    +				@Override
    +				public void onComplete(Throwable failure, Object success) throws Throwable {
    +					try {
    +						BlobKey blobKey = (BlobKey) success;
    +
    +						//delete previous log file, if it is different than the current one
    +						if ((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).containsKey(taskManagerID)) {
    +							if (!blobKey.equals((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).get(taskManagerID))) {
    +								jobManager.tell(JobManagerMessages.getDeleteTaskManagerLog((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).remove(taskManagerID)));
    +								(serveLogFile ? lastSubmittedLog : lastSubmittedStdout).put(taskManagerID, blobKey);
    +							}
    +						} else {
    +							(serveLogFile ? lastSubmittedLog : lastSubmittedStdout).put(taskManagerID, blobKey);
    +						}
    +
    +						//send blobkey to jobmanager
    +						Future<Object> logPathFuture = jobManager.ask(JobManagerMessages.getRequestTaskManagerLog(blobKey), timeout);
    +						String filePath = (String) Await.result(logPathFuture, timeout);
    +
    +						File file = new File(filePath);
    +						final RandomAccessFile raf;
    +						try {
    +							raf = new RandomAccessFile(file, "r");
    +						} catch (FileNotFoundException e) {
    +							sendError(ctx, NOT_FOUND);
    +							return;
    +						}
    +						long fileLength = raf.length();
    +
    +						HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
    +						setContentTypeHeader(response, file);
    +
    +						if (HttpHeaders.isKeepAlive(request)) {
    +							response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
    +						}
    +						HttpHeaders.setContentLength(response, fileLength);
    +
    +						// write the initial line and the header.
    +						ctx.write(response);
    +
    +						// write the content.
    +						ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise())
    +							.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
    +								@Override
    +								public void operationComplete(io.netty.util.concurrent.Future<? super Void> future) throws Exception {
    +									synchronized (lastRequestCompleted) {
    +										lastRequestCompleted.put(taskManagerID, true);
    +									}
    +								}
    +							});
    +						ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
    +
    +						// close the connection, if no keep-alive is needed
    +						if (!HttpHeaders.isKeepAlive(request)) {
    +							lastContentFuture.addListener(ChannelFutureListener.CLOSE);
    +						}
    +					} catch (Exception e) {
    +						logger.error("Serving of taskmanager log failed: " + e.getMessage());
    +					}
    +				}
    +			}, executor);
    +		} else {
    +			display(ctx, request, "loading...");
    +			return;
    --- End diff --
    
    not necessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-197232389
  
    The jobmanager/dashboard separation should be really easy to do. You just have to give the `WebRuntimeMonitor` a `BlobService` as a constructor parameter. And now you can simply pass in the JobManager's `BlobServer` as a first implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56156180
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    +	private final ExecutionContextExecutor executor;
    +
    +	private String localJobManagerAddress;
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressPromise,
    +		FiniteDuration timeout,
    +		boolean serveLogFile) throws IOException {
    +
    +		this(retriever, executor, localJobManagerAddressPromise, timeout, DEFAULT_LOGGER, serveLogFile);
    +	}
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressFuture,
    +		FiniteDuration timeout,
    +		Logger logger, boolean serveLogFile) throws IOException {
    +
    +		this.retriever = checkNotNull(retriever);
    +		this.executor = checkNotNull(executor);
    +		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
    +		this.timeout = checkNotNull(timeout);
    +		this.logger = checkNotNull(logger);
    +		this.serveLogFile = serveLogFile;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Responses to requests
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
    +		if (localJobManagerAddressFuture.isCompleted()) {
    +			if (localJobManagerAddress == null) {
    +				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
    +			}
    +
    +			final HttpRequest request = routed.request();
    +
    +			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
    +
    +			if (jobManager.isDefined()) {
    +				// Redirect to leader if necessary
    +				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
    +					localJobManagerAddress, jobManager.get());
    +
    +				if (redirectAddress != null) {
    +					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, "");
    +					KeepAliveWrite.flush(ctx, routed.request(), redirect);
    +				} else {
    +					respondAsLeader(ctx, request, routed.pathParams(), jobManager.get()._1());
    +				}
    +			} else {
    +				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +			}
    +		} else {
    +			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +		}
    +	}
    +
    +	/**
    +	 * Response when running with leading JobManager.
    +	 */
    +	private void respondAsLeader(final ChannelHandlerContext ctx, final HttpRequest request, final Map<String, String> pathParams, final ActorGateway jobManager) throws Exception {
    +		final String taskManagerID = pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
    +
    +		boolean fetch;
    +		synchronized (lastRequestCompleted) {
    +			if (!lastRequestCompleted.containsKey(taskManagerID)) {
    +				lastRequestCompleted.put(taskManagerID, true);
    +			}
    +			fetch = lastRequestCompleted.get(taskManagerID);
    +			lastRequestCompleted.put(taskManagerID, false);
    +		}
    +
    +		//fetch taskmanager logs if no other process is currently doing it
    +		if (fetch) {
    +			//get taskmanager gateway
    +			InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
    +			Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
    +			JobManagerMessages.TaskManagerInstance instance = (JobManagerMessages.TaskManagerInstance) Await.result(future, timeout);
    +			Instance taskManager = instance.instance().get();
    +
    +			Future<Object> isBlobServiceDefined = taskManager.getActorGateway().ask(TaskManagerMessages.getIsBlobServiceDefined(), timeout);
    +
    +			if (!(Boolean) Await.result(isBlobServiceDefined, timeout)) {
    +				display(ctx, request, "BlobService unavailable, cannot upload TaskManager logs.");
    +				return;
    +			}
    +
    +			//send log request to taskmanager
    +			Future<Object> blobKeyFuture = taskManager.getActorGateway().ask(serveLogFile ? TaskManagerMessages.getRequestTaskManagerLog() : TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
    +			blobKeyFuture.onComplete(new OnComplete<Object>() {
    +				@Override
    +				public void onComplete(Throwable failure, Object success) throws Throwable {
    +					try {
    +						BlobKey blobKey = (BlobKey) success;
    +
    +						//delete previous log file, if it is different than the current one
    +						if ((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).containsKey(taskManagerID)) {
    +							if (!blobKey.equals((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).get(taskManagerID))) {
    +								jobManager.tell(JobManagerMessages.getDeleteTaskManagerLog((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).remove(taskManagerID)));
    +								(serveLogFile ? lastSubmittedLog : lastSubmittedStdout).put(taskManagerID, blobKey);
    +							}
    +						} else {
    +							(serveLogFile ? lastSubmittedLog : lastSubmittedStdout).put(taskManagerID, blobKey);
    +						}
    +
    +						//send blobkey to jobmanager
    +						Future<Object> logPathFuture = jobManager.ask(JobManagerMessages.getRequestTaskManagerLog(blobKey), timeout);
    +						String filePath = (String) Await.result(logPathFuture, timeout);
    +
    +						File file = new File(filePath);
    +						final RandomAccessFile raf;
    +						try {
    +							raf = new RandomAccessFile(file, "r");
    +						} catch (FileNotFoundException e) {
    +							sendError(ctx, NOT_FOUND);
    +							return;
    +						}
    +						long fileLength = raf.length();
    +
    +						HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
    +						setContentTypeHeader(response, file);
    +
    +						if (HttpHeaders.isKeepAlive(request)) {
    +							response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
    +						}
    +						HttpHeaders.setContentLength(response, fileLength);
    +
    +						// write the initial line and the header.
    +						ctx.write(response);
    +
    +						// write the content.
    +						ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise())
    +							.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
    +								@Override
    +								public void operationComplete(io.netty.util.concurrent.Future<? super Void> future) throws Exception {
    +									synchronized (lastRequestCompleted) {
    +										lastRequestCompleted.put(taskManagerID, true);
    +									}
    +								}
    +							});
    +						ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
    +
    +						// close the connection, if no keep-alive is needed
    +						if (!HttpHeaders.isKeepAlive(request)) {
    +							lastContentFuture.addListener(ChannelFutureListener.CLOSE);
    +						}
    +					} catch (Exception e) {
    +						logger.error("Serving of taskmanager log failed: " + e.getMessage());
    --- End diff --
    
    I would add the exception to the log, not only the message


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-205270976
  
    Looks good to merge. @zentol Could you rebase this to the latest master?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56160873
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -781,6 +787,32 @@ class TaskManager(
         }
       }
     
    +  private def handleRequestTaskManagerLog
    +  (sender: ActorRef, requestType: LogTypeRequest, jobManager: ActorRef):
    +  Unit = {
    +    if (blobService.isDefined && jobManager.path.address.host.isDefined) {
    --- End diff --
    
    If the `blobService` is defined the `jobManager` should have a defined address as well. I think it would be better to use pattern matching to extract the blobService from the `Option`. This is more scalaesque.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-198469599
  
    Cool! Thank you for fixing it so quickly.
    
    From my point of view, the change is good to merge. I've tested it on a cluster and locally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56186817
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    --- End diff --
    
    i think i found a solution that keeps us both happy...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-198345651
  
    Also, I'm not sure if this log message is correct:
    ```
    2016-03-18 12:59:43,376 INFO  org.apache.flink.runtime.blob.BlobCache                       - Downloading ee982985ecf8b6a9e3d33eb4eb7d84f480666e33 from localhost/127.0.0.1:55764
    2016-03-18 12:59:48,594 INFO  org.apache.flink.runtime.blob.BlobCache                       - Downloading a375a60bb40c8f95c786bcffc77c7b7c15a9ddf3 from localhost/127.0.0.1:55764
    2016-03-18 12:59:56,727 INFO  org.apache.flink.runtime.blob.BlobCache                       - Downloading 6077377ab5a146b9c2f0917d2e1fd03b8cfdbe84 from localhost/127.0.0.1:55764
    ```
    
    It says its downloading the logs locally, but they are loaded from the remote TM, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-197375947
  
    Third batch of changes:
    * TaskManager log file location config key added
    * Test added
    * FileInputStream in handleRequestTaskManagerLog properly closed
    
    I just ran out of issues to address.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-196970622
  
    First batch of comments addressed, the following are still missing:
    * test cases
    * re-implement as something that is not a SimpleChannelInboundHandler
    * possible abstraction to separate jobmanager/dashboard


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56180094
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    +	private final ExecutionContextExecutor executor;
    +
    +	private String localJobManagerAddress;
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressPromise,
    +		FiniteDuration timeout,
    +		boolean serveLogFile) throws IOException {
    +
    +		this(retriever, executor, localJobManagerAddressPromise, timeout, DEFAULT_LOGGER, serveLogFile);
    +	}
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressFuture,
    +		FiniteDuration timeout,
    +		Logger logger, boolean serveLogFile) throws IOException {
    +
    +		this.retriever = checkNotNull(retriever);
    +		this.executor = checkNotNull(executor);
    +		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
    +		this.timeout = checkNotNull(timeout);
    +		this.logger = checkNotNull(logger);
    +		this.serveLogFile = serveLogFile;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Responses to requests
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
    +		if (localJobManagerAddressFuture.isCompleted()) {
    +			if (localJobManagerAddress == null) {
    +				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
    +			}
    +
    +			final HttpRequest request = routed.request();
    +
    +			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
    +
    +			if (jobManager.isDefined()) {
    +				// Redirect to leader if necessary
    +				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
    +					localJobManagerAddress, jobManager.get());
    +
    +				if (redirectAddress != null) {
    +					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, "");
    +					KeepAliveWrite.flush(ctx, routed.request(), redirect);
    +				} else {
    +					respondAsLeader(ctx, request, routed.pathParams(), jobManager.get()._1());
    +				}
    +			} else {
    +				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +			}
    +		} else {
    +			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +		}
    +	}
    +
    +	/**
    +	 * Response when running with leading JobManager.
    +	 */
    +	private void respondAsLeader(final ChannelHandlerContext ctx, final HttpRequest request, final Map<String, String> pathParams, final ActorGateway jobManager) throws Exception {
    +		final String taskManagerID = pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
    +
    +		boolean fetch;
    +		synchronized (lastRequestCompleted) {
    +			if (!lastRequestCompleted.containsKey(taskManagerID)) {
    +				lastRequestCompleted.put(taskManagerID, true);
    +			}
    +			fetch = lastRequestCompleted.get(taskManagerID);
    +			lastRequestCompleted.put(taskManagerID, false);
    +		}
    +
    +		//fetch taskmanager logs if no other process is currently doing it
    +		if (fetch) {
    +			//get taskmanager gateway
    +			InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
    +			Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
    +			JobManagerMessages.TaskManagerInstance instance = (JobManagerMessages.TaskManagerInstance) Await.result(future, timeout);
    +			Instance taskManager = instance.instance().get();
    +
    +			Future<Object> isBlobServiceDefined = taskManager.getActorGateway().ask(TaskManagerMessages.getIsBlobServiceDefined(), timeout);
    +
    +			if (!(Boolean) Await.result(isBlobServiceDefined, timeout)) {
    +				display(ctx, request, "BlobService unavailable, cannot upload TaskManager logs.");
    +				return;
    +			}
    +
    +			//send log request to taskmanager
    +			Future<Object> blobKeyFuture = taskManager.getActorGateway().ask(serveLogFile ? TaskManagerMessages.getRequestTaskManagerLog() : TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
    +			blobKeyFuture.onComplete(new OnComplete<Object>() {
    +				@Override
    +				public void onComplete(Throwable failure, Object success) throws Throwable {
    +					try {
    +						BlobKey blobKey = (BlobKey) success;
    --- End diff --
    
    Not if the receiver sent `null` back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56177903
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    +	private final ExecutionContextExecutor executor;
    +
    +	private String localJobManagerAddress;
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressPromise,
    +		FiniteDuration timeout,
    +		boolean serveLogFile) throws IOException {
    +
    +		this(retriever, executor, localJobManagerAddressPromise, timeout, DEFAULT_LOGGER, serveLogFile);
    +	}
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressFuture,
    +		FiniteDuration timeout,
    +		Logger logger, boolean serveLogFile) throws IOException {
    +
    +		this.retriever = checkNotNull(retriever);
    +		this.executor = checkNotNull(executor);
    +		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
    +		this.timeout = checkNotNull(timeout);
    +		this.logger = checkNotNull(logger);
    +		this.serveLogFile = serveLogFile;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Responses to requests
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
    +		if (localJobManagerAddressFuture.isCompleted()) {
    +			if (localJobManagerAddress == null) {
    +				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
    +			}
    +
    +			final HttpRequest request = routed.request();
    +
    +			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
    +
    +			if (jobManager.isDefined()) {
    +				// Redirect to leader if necessary
    +				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
    +					localJobManagerAddress, jobManager.get());
    +
    +				if (redirectAddress != null) {
    +					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, "");
    +					KeepAliveWrite.flush(ctx, routed.request(), redirect);
    +				} else {
    +					respondAsLeader(ctx, request, routed.pathParams(), jobManager.get()._1());
    +				}
    +			} else {
    +				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +			}
    +		} else {
    +			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +		}
    +	}
    +
    +	/**
    +	 * Response when running with leading JobManager.
    +	 */
    +	private void respondAsLeader(final ChannelHandlerContext ctx, final HttpRequest request, final Map<String, String> pathParams, final ActorGateway jobManager) throws Exception {
    +		final String taskManagerID = pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
    +
    +		boolean fetch;
    +		synchronized (lastRequestCompleted) {
    +			if (!lastRequestCompleted.containsKey(taskManagerID)) {
    +				lastRequestCompleted.put(taskManagerID, true);
    +			}
    +			fetch = lastRequestCompleted.get(taskManagerID);
    +			lastRequestCompleted.put(taskManagerID, false);
    +		}
    +
    +		//fetch taskmanager logs if no other process is currently doing it
    +		if (fetch) {
    +			//get taskmanager gateway
    +			InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
    +			Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
    +			JobManagerMessages.TaskManagerInstance instance = (JobManagerMessages.TaskManagerInstance) Await.result(future, timeout);
    +			Instance taskManager = instance.instance().get();
    +
    +			Future<Object> isBlobServiceDefined = taskManager.getActorGateway().ask(TaskManagerMessages.getIsBlobServiceDefined(), timeout);
    +
    +			if (!(Boolean) Await.result(isBlobServiceDefined, timeout)) {
    +				display(ctx, request, "BlobService unavailable, cannot upload TaskManager logs.");
    +				return;
    +			}
    +
    +			//send log request to taskmanager
    +			Future<Object> blobKeyFuture = taskManager.getActorGateway().ask(serveLogFile ? TaskManagerMessages.getRequestTaskManagerLog() : TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
    +			blobKeyFuture.onComplete(new OnComplete<Object>() {
    +				@Override
    +				public void onComplete(Throwable failure, Object success) throws Throwable {
    +					try {
    +						BlobKey blobKey = (BlobKey) success;
    +
    +						//delete previous log file, if it is different than the current one
    +						if ((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).containsKey(taskManagerID)) {
    +							if (!blobKey.equals((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).get(taskManagerID))) {
    +								jobManager.tell(JobManagerMessages.getDeleteTaskManagerLog((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).remove(taskManagerID)));
    +								(serveLogFile ? lastSubmittedLog : lastSubmittedStdout).put(taskManagerID, blobKey);
    +							}
    +						} else {
    +							(serveLogFile ? lastSubmittedLog : lastSubmittedStdout).put(taskManagerID, blobKey);
    +						}
    +
    +						//send blobkey to jobmanager
    +						Future<Object> logPathFuture = jobManager.ask(JobManagerMessages.getRequestTaskManagerLog(blobKey), timeout);
    +						String filePath = (String) Await.result(logPathFuture, timeout);
    +
    +						File file = new File(filePath);
    +						final RandomAccessFile raf;
    +						try {
    +							raf = new RandomAccessFile(file, "r");
    +						} catch (FileNotFoundException e) {
    +							sendError(ctx, NOT_FOUND);
    +							return;
    +						}
    +						long fileLength = raf.length();
    +
    +						HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
    +						setContentTypeHeader(response, file);
    +
    +						if (HttpHeaders.isKeepAlive(request)) {
    +							response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
    +						}
    +						HttpHeaders.setContentLength(response, fileLength);
    +
    +						// write the initial line and the header.
    +						ctx.write(response);
    +
    +						// write the content.
    +						ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise())
    +							.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
    +								@Override
    +								public void operationComplete(io.netty.util.concurrent.Future<? super Void> future) throws Exception {
    +									synchronized (lastRequestCompleted) {
    +										lastRequestCompleted.put(taskManagerID, true);
    +									}
    +								}
    +							});
    +						ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
    +
    +						// close the connection, if no keep-alive is needed
    +						if (!HttpHeaders.isKeepAlive(request)) {
    +							lastContentFuture.addListener(ChannelFutureListener.CLOSE);
    +						}
    +					} catch (Exception e) {
    +						logger.error("Serving of taskmanager log failed: " + e.getMessage());
    +					}
    +				}
    +			}, executor);
    +		} else {
    +			display(ctx, request, "loading...");
    +			return;
    --- End diff --
    
    what exactly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56171474
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    --- End diff --
    
    I think it would be better to use an enum here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56162762
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -310,6 +310,12 @@ class TaskManager(
     
         case FatalError(message, cause) =>
           killTaskManagerFatal(message, cause)
    +
    +    case RequestTaskManagerLog(requestType : LogTypeRequest) =>
    +      handleRequestTaskManagerLog(sender(), requestType, currentJobManager.get)
    +
    +    case IsBlobServiceDefined() =>
    +      sender() ! (blobService.isDefined && currentJobManager.get.path.address.host.isDefined)
    --- End diff --
    
    this is not the case when using the start-local.sh script.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56184137
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    --- End diff --
    
    But it is not intuitive that if the `serveLogFile` is false, then the stdout file is served. I would expect that if `serveLogFile` is false, that the log file won't be served. This becomes obvious if you look at the instantiation of the `TaskManagerLogHandler` where you have an boolean value as parameter which one does not understand if you don't look in the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-198344319
  
    I just tested the change on a cluster, also with larger log files, and it seems to work very well.
    The only thing I found was that clicking the "Download" button on the top right lead to the following error: "Failure: 404 Not Found"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-205877524
  
    Rebased.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56157213
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -781,6 +787,32 @@ class TaskManager(
         }
       }
     
    +  private def handleRequestTaskManagerLog
    +  (sender: ActorRef, requestType: LogTypeRequest, jobManager: ActorRef):
    +  Unit = {
    --- End diff --
    
    Formatting is off here. I would suggest
    
    ```
    private def xyz(
        a: A,
        b: B)
      : C = {
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56155829
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    --- End diff --
    
    The logger should probably be of `TaskManagerLogHandler`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56156445
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -781,6 +787,32 @@ class TaskManager(
         }
       }
     
    +  private def handleRequestTaskManagerLog
    +  (sender: ActorRef, requestType: LogTypeRequest, jobManager: ActorRef):
    +  Unit = {
    +    if (blobService.isDefined && jobManager.path.address.host.isDefined) {
    +      //create new Thread to upload log, to not block the TM for too long
    +      new Thread() {
    +        override def run(): Unit = {
    +          val jmHost = jobManager.path.address.host.get
    +          val port = blobService.get.getPort
    +          val logFilePath = System.getProperty("log.file");
    --- End diff --
    
    I don't think we can rely on the system property to be set correctly.
    I think the log directory is in some configuration


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56181072
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    +	private final ExecutionContextExecutor executor;
    +
    +	private String localJobManagerAddress;
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressPromise,
    +		FiniteDuration timeout,
    +		boolean serveLogFile) throws IOException {
    +
    +		this(retriever, executor, localJobManagerAddressPromise, timeout, DEFAULT_LOGGER, serveLogFile);
    +	}
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressFuture,
    +		FiniteDuration timeout,
    +		Logger logger, boolean serveLogFile) throws IOException {
    +
    +		this.retriever = checkNotNull(retriever);
    +		this.executor = checkNotNull(executor);
    +		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
    +		this.timeout = checkNotNull(timeout);
    +		this.logger = checkNotNull(logger);
    +		this.serveLogFile = serveLogFile;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Responses to requests
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
    +		if (localJobManagerAddressFuture.isCompleted()) {
    +			if (localJobManagerAddress == null) {
    +				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
    +			}
    +
    +			final HttpRequest request = routed.request();
    +
    +			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
    +
    +			if (jobManager.isDefined()) {
    +				// Redirect to leader if necessary
    +				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
    +					localJobManagerAddress, jobManager.get());
    +
    +				if (redirectAddress != null) {
    +					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, "");
    +					KeepAliveWrite.flush(ctx, routed.request(), redirect);
    +				} else {
    +					respondAsLeader(ctx, request, routed.pathParams(), jobManager.get()._1());
    +				}
    +			} else {
    +				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +			}
    +		} else {
    +			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +		}
    +	}
    +
    +	/**
    +	 * Response when running with leading JobManager.
    +	 */
    +	private void respondAsLeader(final ChannelHandlerContext ctx, final HttpRequest request, final Map<String, String> pathParams, final ActorGateway jobManager) throws Exception {
    +		final String taskManagerID = pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
    +
    +		boolean fetch;
    +		synchronized (lastRequestCompleted) {
    +			if (!lastRequestCompleted.containsKey(taskManagerID)) {
    +				lastRequestCompleted.put(taskManagerID, true);
    +			}
    +			fetch = lastRequestCompleted.get(taskManagerID);
    +			lastRequestCompleted.put(taskManagerID, false);
    +		}
    +
    +		//fetch taskmanager logs if no other process is currently doing it
    +		if (fetch) {
    +			//get taskmanager gateway
    +			InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
    +			Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
    +			JobManagerMessages.TaskManagerInstance instance = (JobManagerMessages.TaskManagerInstance) Await.result(future, timeout);
    +			Instance taskManager = instance.instance().get();
    +
    +			Future<Object> isBlobServiceDefined = taskManager.getActorGateway().ask(TaskManagerMessages.getIsBlobServiceDefined(), timeout);
    +
    +			if (!(Boolean) Await.result(isBlobServiceDefined, timeout)) {
    +				display(ctx, request, "BlobService unavailable, cannot upload TaskManager logs.");
    +				return;
    +			}
    +
    +			//send log request to taskmanager
    +			Future<Object> blobKeyFuture = taskManager.getActorGateway().ask(serveLogFile ? TaskManagerMessages.getRequestTaskManagerLog() : TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
    +			blobKeyFuture.onComplete(new OnComplete<Object>() {
    +				@Override
    +				public void onComplete(Throwable failure, Object success) throws Throwable {
    +					try {
    +						BlobKey blobKey = (BlobKey) success;
    +
    +						//delete previous log file, if it is different than the current one
    +						if ((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).containsKey(taskManagerID)) {
    +							if (!blobKey.equals((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).get(taskManagerID))) {
    +								jobManager.tell(JobManagerMessages.getDeleteTaskManagerLog((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).remove(taskManagerID)));
    +								(serveLogFile ? lastSubmittedLog : lastSubmittedStdout).put(taskManagerID, blobKey);
    +							}
    +						} else {
    +							(serveLogFile ? lastSubmittedLog : lastSubmittedStdout).put(taskManagerID, blobKey);
    +						}
    +
    +						//send blobkey to jobmanager
    +						Future<Object> logPathFuture = jobManager.ask(JobManagerMessages.getRequestTaskManagerLog(blobKey), timeout);
    +						String filePath = (String) Await.result(logPathFuture, timeout);
    +
    +						File file = new File(filePath);
    +						final RandomAccessFile raf;
    +						try {
    +							raf = new RandomAccessFile(file, "r");
    +						} catch (FileNotFoundException e) {
    +							sendError(ctx, NOT_FOUND);
    +							return;
    +						}
    +						long fileLength = raf.length();
    +
    +						HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
    +						setContentTypeHeader(response, file);
    +
    +						if (HttpHeaders.isKeepAlive(request)) {
    +							response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
    +						}
    +						HttpHeaders.setContentLength(response, fileLength);
    +
    +						// write the initial line and the header.
    +						ctx.write(response);
    +
    +						// write the content.
    +						ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise())
    +							.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
    +								@Override
    +								public void operationComplete(io.netty.util.concurrent.Future<? super Void> future) throws Exception {
    +									synchronized (lastRequestCompleted) {
    +										lastRequestCompleted.put(taskManagerID, true);
    +									}
    +								}
    +							});
    +						ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
    +
    +						// close the connection, if no keep-alive is needed
    +						if (!HttpHeaders.isKeepAlive(request)) {
    +							lastContentFuture.addListener(ChannelFutureListener.CLOSE);
    +						}
    +					} catch (Exception e) {
    +						logger.error("Serving of taskmanager log failed: " + e.getMessage());
    +					}
    +				}
    +			}, executor);
    +		} else {
    +			display(ctx, request, "loading...");
    +			return;
    --- End diff --
    
    `return;`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56157057
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala ---
    @@ -114,6 +114,16 @@ object TaskManagerMessages {
         */
       case class JobManagerLeaderAddress(jobManagerAddress: String, leaderSessionID: UUID)
     
    +  sealed trait LogTypeRequest
    +
    +  case object LogFileRequest extends LogTypeRequest
    +
    +  case object StdOutFileRequest extends LogTypeRequest
    +
    +  case class RequestTaskManagerLog(requestType : LogTypeRequest)
    +
    +  case class IsBlobServiceDefined()
    --- End diff --
    
    should be a case object


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56161954
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -781,6 +787,32 @@ class TaskManager(
         }
       }
     
    +  private def handleRequestTaskManagerLog
    +  (sender: ActorRef, requestType: LogTypeRequest, jobManager: ActorRef):
    +  Unit = {
    +    if (blobService.isDefined && jobManager.path.address.host.isDefined) {
    --- End diff --
    
    this check is made specifically for the start-local.sh script, for which the jobManager does not have a defined address.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-206823491
  
    There appears to be a problem with the test, looking into it...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56172080
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    +	private final ExecutionContextExecutor executor;
    +
    +	private String localJobManagerAddress;
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressPromise,
    +		FiniteDuration timeout,
    +		boolean serveLogFile) throws IOException {
    +
    +		this(retriever, executor, localJobManagerAddressPromise, timeout, DEFAULT_LOGGER, serveLogFile);
    +	}
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressFuture,
    +		FiniteDuration timeout,
    +		Logger logger, boolean serveLogFile) throws IOException {
    +
    +		this.retriever = checkNotNull(retriever);
    +		this.executor = checkNotNull(executor);
    +		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
    +		this.timeout = checkNotNull(timeout);
    +		this.logger = checkNotNull(logger);
    +		this.serveLogFile = serveLogFile;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Responses to requests
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
    +		if (localJobManagerAddressFuture.isCompleted()) {
    +			if (localJobManagerAddress == null) {
    +				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
    +			}
    +
    +			final HttpRequest request = routed.request();
    +
    +			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
    +
    +			if (jobManager.isDefined()) {
    +				// Redirect to leader if necessary
    +				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
    +					localJobManagerAddress, jobManager.get());
    +
    +				if (redirectAddress != null) {
    +					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, "");
    +					KeepAliveWrite.flush(ctx, routed.request(), redirect);
    +				} else {
    +					respondAsLeader(ctx, request, routed.pathParams(), jobManager.get()._1());
    +				}
    +			} else {
    +				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +			}
    +		} else {
    +			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +		}
    +	}
    +
    +	/**
    +	 * Response when running with leading JobManager.
    +	 */
    +	private void respondAsLeader(final ChannelHandlerContext ctx, final HttpRequest request, final Map<String, String> pathParams, final ActorGateway jobManager) throws Exception {
    +		final String taskManagerID = pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
    +
    +		boolean fetch;
    +		synchronized (lastRequestCompleted) {
    +			if (!lastRequestCompleted.containsKey(taskManagerID)) {
    +				lastRequestCompleted.put(taskManagerID, true);
    +			}
    +			fetch = lastRequestCompleted.get(taskManagerID);
    +			lastRequestCompleted.put(taskManagerID, false);
    +		}
    +
    +		//fetch taskmanager logs if no other process is currently doing it
    +		if (fetch) {
    +			//get taskmanager gateway
    +			InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
    +			Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
    +			JobManagerMessages.TaskManagerInstance instance = (JobManagerMessages.TaskManagerInstance) Await.result(future, timeout);
    +			Instance taskManager = instance.instance().get();
    +
    +			Future<Object> isBlobServiceDefined = taskManager.getActorGateway().ask(TaskManagerMessages.getIsBlobServiceDefined(), timeout);
    +
    +			if (!(Boolean) Await.result(isBlobServiceDefined, timeout)) {
    +				display(ctx, request, "BlobService unavailable, cannot upload TaskManager logs.");
    +				return;
    +			}
    +
    +			//send log request to taskmanager
    +			Future<Object> blobKeyFuture = taskManager.getActorGateway().ask(serveLogFile ? TaskManagerMessages.getRequestTaskManagerLog() : TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
    +			blobKeyFuture.onComplete(new OnComplete<Object>() {
    +				@Override
    +				public void onComplete(Throwable failure, Object success) throws Throwable {
    +					try {
    +						BlobKey blobKey = (BlobKey) success;
    +
    +						//delete previous log file, if it is different than the current one
    +						if ((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).containsKey(taskManagerID)) {
    +							if (!blobKey.equals((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).get(taskManagerID))) {
    +								jobManager.tell(JobManagerMessages.getDeleteTaskManagerLog((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).remove(taskManagerID)));
    +								(serveLogFile ? lastSubmittedLog : lastSubmittedStdout).put(taskManagerID, blobKey);
    +							}
    +						} else {
    +							(serveLogFile ? lastSubmittedLog : lastSubmittedStdout).put(taskManagerID, blobKey);
    +						}
    --- End diff --
    
    I think this code block would be a bit easier to read if we refrained from using too much nesting and ternary operators.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56172226
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    +	private final ExecutionContextExecutor executor;
    +
    +	private String localJobManagerAddress;
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressPromise,
    +		FiniteDuration timeout,
    +		boolean serveLogFile) throws IOException {
    +
    +		this(retriever, executor, localJobManagerAddressPromise, timeout, DEFAULT_LOGGER, serveLogFile);
    +	}
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressFuture,
    +		FiniteDuration timeout,
    +		Logger logger, boolean serveLogFile) throws IOException {
    +
    +		this.retriever = checkNotNull(retriever);
    +		this.executor = checkNotNull(executor);
    +		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
    +		this.timeout = checkNotNull(timeout);
    +		this.logger = checkNotNull(logger);
    +		this.serveLogFile = serveLogFile;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Responses to requests
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
    +		if (localJobManagerAddressFuture.isCompleted()) {
    +			if (localJobManagerAddress == null) {
    +				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
    +			}
    +
    +			final HttpRequest request = routed.request();
    +
    +			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
    +
    +			if (jobManager.isDefined()) {
    +				// Redirect to leader if necessary
    +				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
    +					localJobManagerAddress, jobManager.get());
    +
    +				if (redirectAddress != null) {
    +					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, "");
    +					KeepAliveWrite.flush(ctx, routed.request(), redirect);
    +				} else {
    +					respondAsLeader(ctx, request, routed.pathParams(), jobManager.get()._1());
    +				}
    +			} else {
    +				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +			}
    +		} else {
    +			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +		}
    +	}
    +
    +	/**
    +	 * Response when running with leading JobManager.
    +	 */
    +	private void respondAsLeader(final ChannelHandlerContext ctx, final HttpRequest request, final Map<String, String> pathParams, final ActorGateway jobManager) throws Exception {
    +		final String taskManagerID = pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
    +
    +		boolean fetch;
    +		synchronized (lastRequestCompleted) {
    +			if (!lastRequestCompleted.containsKey(taskManagerID)) {
    +				lastRequestCompleted.put(taskManagerID, true);
    +			}
    +			fetch = lastRequestCompleted.get(taskManagerID);
    +			lastRequestCompleted.put(taskManagerID, false);
    +		}
    +
    +		//fetch taskmanager logs if no other process is currently doing it
    +		if (fetch) {
    +			//get taskmanager gateway
    +			InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
    +			Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
    +			JobManagerMessages.TaskManagerInstance instance = (JobManagerMessages.TaskManagerInstance) Await.result(future, timeout);
    +			Instance taskManager = instance.instance().get();
    +
    +			Future<Object> isBlobServiceDefined = taskManager.getActorGateway().ask(TaskManagerMessages.getIsBlobServiceDefined(), timeout);
    +
    +			if (!(Boolean) Await.result(isBlobServiceDefined, timeout)) {
    +				display(ctx, request, "BlobService unavailable, cannot upload TaskManager logs.");
    +				return;
    +			}
    +
    +			//send log request to taskmanager
    +			Future<Object> blobKeyFuture = taskManager.getActorGateway().ask(serveLogFile ? TaskManagerMessages.getRequestTaskManagerLog() : TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
    +			blobKeyFuture.onComplete(new OnComplete<Object>() {
    +				@Override
    +				public void onComplete(Throwable failure, Object success) throws Throwable {
    +					try {
    +						BlobKey blobKey = (BlobKey) success;
    --- End diff --
    
    `success` can also be null


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56171395
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    --- End diff --
    
    Why do we implement a new `SimpleChannelInboundHandler` instead of a `RequestHandler`. That latter way would avoid a considerable amount of redundant code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56172183
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    +	private final ExecutionContextExecutor executor;
    +
    +	private String localJobManagerAddress;
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressPromise,
    +		FiniteDuration timeout,
    +		boolean serveLogFile) throws IOException {
    +
    +		this(retriever, executor, localJobManagerAddressPromise, timeout, DEFAULT_LOGGER, serveLogFile);
    +	}
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressFuture,
    +		FiniteDuration timeout,
    +		Logger logger, boolean serveLogFile) throws IOException {
    +
    +		this.retriever = checkNotNull(retriever);
    +		this.executor = checkNotNull(executor);
    +		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
    +		this.timeout = checkNotNull(timeout);
    +		this.logger = checkNotNull(logger);
    +		this.serveLogFile = serveLogFile;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Responses to requests
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
    +		if (localJobManagerAddressFuture.isCompleted()) {
    +			if (localJobManagerAddress == null) {
    +				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
    +			}
    +
    +			final HttpRequest request = routed.request();
    +
    +			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
    +
    +			if (jobManager.isDefined()) {
    +				// Redirect to leader if necessary
    +				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
    +					localJobManagerAddress, jobManager.get());
    +
    +				if (redirectAddress != null) {
    +					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, "");
    +					KeepAliveWrite.flush(ctx, routed.request(), redirect);
    +				} else {
    +					respondAsLeader(ctx, request, routed.pathParams(), jobManager.get()._1());
    +				}
    +			} else {
    +				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +			}
    +		} else {
    +			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +		}
    +	}
    +
    +	/**
    +	 * Response when running with leading JobManager.
    +	 */
    +	private void respondAsLeader(final ChannelHandlerContext ctx, final HttpRequest request, final Map<String, String> pathParams, final ActorGateway jobManager) throws Exception {
    +		final String taskManagerID = pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
    +
    +		boolean fetch;
    +		synchronized (lastRequestCompleted) {
    +			if (!lastRequestCompleted.containsKey(taskManagerID)) {
    +				lastRequestCompleted.put(taskManagerID, true);
    +			}
    +			fetch = lastRequestCompleted.get(taskManagerID);
    +			lastRequestCompleted.put(taskManagerID, false);
    +		}
    +
    +		//fetch taskmanager logs if no other process is currently doing it
    +		if (fetch) {
    +			//get taskmanager gateway
    +			InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
    +			Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
    +			JobManagerMessages.TaskManagerInstance instance = (JobManagerMessages.TaskManagerInstance) Await.result(future, timeout);
    +			Instance taskManager = instance.instance().get();
    +
    +			Future<Object> isBlobServiceDefined = taskManager.getActorGateway().ask(TaskManagerMessages.getIsBlobServiceDefined(), timeout);
    +
    +			if (!(Boolean) Await.result(isBlobServiceDefined, timeout)) {
    +				display(ctx, request, "BlobService unavailable, cannot upload TaskManager logs.");
    +				return;
    +			}
    +
    +			//send log request to taskmanager
    +			Future<Object> blobKeyFuture = taskManager.getActorGateway().ask(serveLogFile ? TaskManagerMessages.getRequestTaskManagerLog() : TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
    +			blobKeyFuture.onComplete(new OnComplete<Object>() {
    +				@Override
    +				public void onComplete(Throwable failure, Object success) throws Throwable {
    +					try {
    +						BlobKey blobKey = (BlobKey) success;
    +
    +						//delete previous log file, if it is different than the current one
    +						if ((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).containsKey(taskManagerID)) {
    +							if (!blobKey.equals((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).get(taskManagerID))) {
    +								jobManager.tell(JobManagerMessages.getDeleteTaskManagerLog((serveLogFile ? lastSubmittedLog : lastSubmittedStdout).remove(taskManagerID)));
    +								(serveLogFile ? lastSubmittedLog : lastSubmittedStdout).put(taskManagerID, blobKey);
    +							}
    +						} else {
    +							(serveLogFile ? lastSubmittedLog : lastSubmittedStdout).put(taskManagerID, blobKey);
    +						}
    +
    +						//send blobkey to jobmanager
    +						Future<Object> logPathFuture = jobManager.ask(JobManagerMessages.getRequestTaskManagerLog(blobKey), timeout);
    +						String filePath = (String) Await.result(logPathFuture, timeout);
    --- End diff --
    
    Blocking operation. It would be better to use future composition here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-196486496
  
    Thanks for the PR and the description. Could you add screenshots so that more people can get a quick idea of the change? :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-198407095
  
    The download button should be functional now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56180171
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    +	private final ExecutionContextExecutor executor;
    +
    +	private String localJobManagerAddress;
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressPromise,
    +		FiniteDuration timeout,
    +		boolean serveLogFile) throws IOException {
    +
    +		this(retriever, executor, localJobManagerAddressPromise, timeout, DEFAULT_LOGGER, serveLogFile);
    +	}
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressFuture,
    +		FiniteDuration timeout,
    +		Logger logger, boolean serveLogFile) throws IOException {
    +
    +		this.retriever = checkNotNull(retriever);
    +		this.executor = checkNotNull(executor);
    +		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
    +		this.timeout = checkNotNull(timeout);
    +		this.logger = checkNotNull(logger);
    +		this.serveLogFile = serveLogFile;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Responses to requests
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
    +		if (localJobManagerAddressFuture.isCompleted()) {
    +			if (localJobManagerAddress == null) {
    +				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
    +			}
    +
    +			final HttpRequest request = routed.request();
    +
    +			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
    +
    +			if (jobManager.isDefined()) {
    +				// Redirect to leader if necessary
    +				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
    +					localJobManagerAddress, jobManager.get());
    +
    +				if (redirectAddress != null) {
    +					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, "");
    +					KeepAliveWrite.flush(ctx, routed.request(), redirect);
    +				} else {
    +					respondAsLeader(ctx, request, routed.pathParams(), jobManager.get()._1());
    +				}
    +			} else {
    +				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +			}
    +		} else {
    +			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +		}
    +	}
    +
    +	/**
    +	 * Response when running with leading JobManager.
    +	 */
    +	private void respondAsLeader(final ChannelHandlerContext ctx, final HttpRequest request, final Map<String, String> pathParams, final ActorGateway jobManager) throws Exception {
    +		final String taskManagerID = pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
    +
    +		boolean fetch;
    +		synchronized (lastRequestCompleted) {
    +			if (!lastRequestCompleted.containsKey(taskManagerID)) {
    +				lastRequestCompleted.put(taskManagerID, true);
    +			}
    +			fetch = lastRequestCompleted.get(taskManagerID);
    +			lastRequestCompleted.put(taskManagerID, false);
    +		}
    +
    +		//fetch taskmanager logs if no other process is currently doing it
    +		if (fetch) {
    +			//get taskmanager gateway
    +			InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
    +			Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
    +			JobManagerMessages.TaskManagerInstance instance = (JobManagerMessages.TaskManagerInstance) Await.result(future, timeout);
    +			Instance taskManager = instance.instance().get();
    +
    +			Future<Object> isBlobServiceDefined = taskManager.getActorGateway().ask(TaskManagerMessages.getIsBlobServiceDefined(), timeout);
    +
    +			if (!(Boolean) Await.result(isBlobServiceDefined, timeout)) {
    +				display(ctx, request, "BlobService unavailable, cannot upload TaskManager logs.");
    +				return;
    +			}
    +
    +			//send log request to taskmanager
    +			Future<Object> blobKeyFuture = taskManager.getActorGateway().ask(serveLogFile ? TaskManagerMessages.getRequestTaskManagerLog() : TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
    +			blobKeyFuture.onComplete(new OnComplete<Object>() {
    +				@Override
    +				public void onComplete(Throwable failure, Object success) throws Throwable {
    +					try {
    +						BlobKey blobKey = (BlobKey) success;
    --- End diff --
    
    You should first check for the failure case and then handle the success case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56171763
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    +	private final ExecutionContextExecutor executor;
    +
    +	private String localJobManagerAddress;
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressPromise,
    +		FiniteDuration timeout,
    +		boolean serveLogFile) throws IOException {
    +
    +		this(retriever, executor, localJobManagerAddressPromise, timeout, DEFAULT_LOGGER, serveLogFile);
    +	}
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressFuture,
    +		FiniteDuration timeout,
    +		Logger logger, boolean serveLogFile) throws IOException {
    +
    +		this.retriever = checkNotNull(retriever);
    +		this.executor = checkNotNull(executor);
    +		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
    +		this.timeout = checkNotNull(timeout);
    +		this.logger = checkNotNull(logger);
    +		this.serveLogFile = serveLogFile;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Responses to requests
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
    +		if (localJobManagerAddressFuture.isCompleted()) {
    +			if (localJobManagerAddress == null) {
    +				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
    +			}
    +
    +			final HttpRequest request = routed.request();
    +
    +			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
    +
    +			if (jobManager.isDefined()) {
    +				// Redirect to leader if necessary
    +				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
    +					localJobManagerAddress, jobManager.get());
    +
    +				if (redirectAddress != null) {
    +					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, "");
    +					KeepAliveWrite.flush(ctx, routed.request(), redirect);
    +				} else {
    +					respondAsLeader(ctx, request, routed.pathParams(), jobManager.get()._1());
    +				}
    +			} else {
    +				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +			}
    +		} else {
    +			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +		}
    +	}
    +
    +	/**
    +	 * Response when running with leading JobManager.
    +	 */
    +	private void respondAsLeader(final ChannelHandlerContext ctx, final HttpRequest request, final Map<String, String> pathParams, final ActorGateway jobManager) throws Exception {
    +		final String taskManagerID = pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
    +
    +		boolean fetch;
    +		synchronized (lastRequestCompleted) {
    +			if (!lastRequestCompleted.containsKey(taskManagerID)) {
    +				lastRequestCompleted.put(taskManagerID, true);
    +			}
    +			fetch = lastRequestCompleted.get(taskManagerID);
    +			lastRequestCompleted.put(taskManagerID, false);
    +		}
    +
    +		//fetch taskmanager logs if no other process is currently doing it
    +		if (fetch) {
    +			//get taskmanager gateway
    +			InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
    +			Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
    +			JobManagerMessages.TaskManagerInstance instance = (JobManagerMessages.TaskManagerInstance) Await.result(future, timeout);
    --- End diff --
    
    This is a blocking operation which we want to avoid in the a request handler. But using future composition to do the job.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56160183
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -781,6 +787,32 @@ class TaskManager(
         }
       }
     
    +  private def handleRequestTaskManagerLog
    +  (sender: ActorRef, requestType: LogTypeRequest, jobManager: ActorRef):
    +  Unit = {
    +    if (blobService.isDefined && jobManager.path.address.host.isDefined) {
    +      //create new Thread to upload log, to not block the TM for too long
    +      new Thread() {
    +        override def run(): Unit = {
    +          val jmHost = jobManager.path.address.host.get
    +          val port = blobService.get.getPort
    +          val logFilePath = System.getProperty("log.file");
    +          val file: File = requestType match {
    +            case LogFileRequest => new File(logFilePath);
    +            case StdOutFileRequest =>
    +              new File(logFilePath.substring(0, logFilePath.length - 4) + ".out");
    +          }
    +
    +          val client: BlobClient = new BlobClient(new InetSocketAddress(jmHost, port))
    +
    +          sender ! client.put(new FileInputStream(file))
    +        }
    +      }.run()
    +    } else {
    +      throw new IOException("No BlobService defined, cannot upload TaskManager logs.")
    --- End diff --
    
    Throwing this exception here will terminate the TM. I think the exception should rather be send to the `sender` where it is logged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56157449
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -781,6 +787,32 @@ class TaskManager(
         }
       }
     
    +  private def handleRequestTaskManagerLog
    +  (sender: ActorRef, requestType: LogTypeRequest, jobManager: ActorRef):
    +  Unit = {
    +    if (blobService.isDefined && jobManager.path.address.host.isDefined) {
    +      //create new Thread to upload log, to not block the TM for too long
    +      new Thread() {
    +        override def run(): Unit = {
    +          val jmHost = jobManager.path.address.host.get
    +          val port = blobService.get.getPort
    +          val logFilePath = System.getProperty("log.file");
    --- End diff --
    
    i looked through the ConfigConstants class and couldn't find a key specific to the log files, as in equivalent to JOB_MANAGER_WEB_LOG_PATH_KEY.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1790#issuecomment-196850069
  
    I think the PR goes in the right direction @zentol. I had some comments concerning the concurrent implementation of the `TaskManagerLogHandler` and on the side of the `TaskManager`. These should be addressed before merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56172245
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +/*****************************************************************************
    + * This code is based on the "HttpStaticFileServerHandler" from the
    + * Netty project's HTTP server example.
    + *
    + * See http://netty.io and
    + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
    + *****************************************************************************/
    +
    +import akka.dispatch.OnComplete;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.KeepAliveWrite;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.util.CharsetUtil;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.TaskManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.runtime.webmonitor.files.MimeTypes;
    +import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
    +import org.apache.flink.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContextExecutor;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.RandomAccessFile;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    +
    +/**
    + * Request handler that returns the TaskManager log/out files.
    + *
    + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
    + * example.</p>
    + */
    +@ChannelHandler.Sharable
    +public class TaskManagerLogHandler extends SimpleChannelInboundHandler<Routed> {
    +
    +	/** Default logger, if none is specified */
    +	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
    +
    +	// ------------------------------------------------------------------------
    +
    +	/** JobManager retriever */
    +	private final JobManagerRetriever retriever;
    +
    +	private final Future<String> localJobManagerAddressFuture;
    +
    +	private final FiniteDuration timeout;
    +
    +	/** Keep track of last transmitted log, to clean up old ones */
    +	private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
    +	private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
    +
    +	/** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
    +	private final HashMap<String, Boolean> lastRequestCompleted = new HashMap<>();
    +
    +	/** The log for all error reporting */
    +	private final Logger logger;
    +
    +	/** indicates which log file should be displayed; true indicates .log, false indicates .out */
    +	private final boolean serveLogFile;
    +	private final ExecutionContextExecutor executor;
    +
    +	private String localJobManagerAddress;
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressPromise,
    +		FiniteDuration timeout,
    +		boolean serveLogFile) throws IOException {
    +
    +		this(retriever, executor, localJobManagerAddressPromise, timeout, DEFAULT_LOGGER, serveLogFile);
    +	}
    +
    +	public TaskManagerLogHandler(
    +		JobManagerRetriever retriever,
    +		ExecutionContextExecutor executor,
    +		Future<String> localJobManagerAddressFuture,
    +		FiniteDuration timeout,
    +		Logger logger, boolean serveLogFile) throws IOException {
    +
    +		this.retriever = checkNotNull(retriever);
    +		this.executor = checkNotNull(executor);
    +		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
    +		this.timeout = checkNotNull(timeout);
    +		this.logger = checkNotNull(logger);
    +		this.serveLogFile = serveLogFile;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Responses to requests
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
    +		if (localJobManagerAddressFuture.isCompleted()) {
    +			if (localJobManagerAddress == null) {
    +				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
    +			}
    +
    +			final HttpRequest request = routed.request();
    +
    +			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
    +
    +			if (jobManager.isDefined()) {
    +				// Redirect to leader if necessary
    +				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
    +					localJobManagerAddress, jobManager.get());
    +
    +				if (redirectAddress != null) {
    +					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, "");
    +					KeepAliveWrite.flush(ctx, routed.request(), redirect);
    +				} else {
    +					respondAsLeader(ctx, request, routed.pathParams(), jobManager.get()._1());
    +				}
    +			} else {
    +				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +			}
    +		} else {
    +			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
    +		}
    +	}
    +
    +	/**
    +	 * Response when running with leading JobManager.
    +	 */
    +	private void respondAsLeader(final ChannelHandlerContext ctx, final HttpRequest request, final Map<String, String> pathParams, final ActorGateway jobManager) throws Exception {
    +		final String taskManagerID = pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
    +
    +		boolean fetch;
    +		synchronized (lastRequestCompleted) {
    +			if (!lastRequestCompleted.containsKey(taskManagerID)) {
    +				lastRequestCompleted.put(taskManagerID, true);
    +			}
    +			fetch = lastRequestCompleted.get(taskManagerID);
    +			lastRequestCompleted.put(taskManagerID, false);
    +		}
    +
    +		//fetch taskmanager logs if no other process is currently doing it
    +		if (fetch) {
    +			//get taskmanager gateway
    +			InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID));
    +			Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
    +			JobManagerMessages.TaskManagerInstance instance = (JobManagerMessages.TaskManagerInstance) Await.result(future, timeout);
    +			Instance taskManager = instance.instance().get();
    +
    +			Future<Object> isBlobServiceDefined = taskManager.getActorGateway().ask(TaskManagerMessages.getIsBlobServiceDefined(), timeout);
    +
    +			if (!(Boolean) Await.result(isBlobServiceDefined, timeout)) {
    +				display(ctx, request, "BlobService unavailable, cannot upload TaskManager logs.");
    +				return;
    +			}
    +
    +			//send log request to taskmanager
    +			Future<Object> blobKeyFuture = taskManager.getActorGateway().ask(serveLogFile ? TaskManagerMessages.getRequestTaskManagerLog() : TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
    +			blobKeyFuture.onComplete(new OnComplete<Object>() {
    +				@Override
    +				public void onComplete(Throwable failure, Object success) throws Throwable {
    --- End diff --
    
    What if the `failure != null`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1790#discussion_r56159087
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -781,6 +787,32 @@ class TaskManager(
         }
       }
     
    +  private def handleRequestTaskManagerLog
    +  (sender: ActorRef, requestType: LogTypeRequest, jobManager: ActorRef):
    +  Unit = {
    +    if (blobService.isDefined && jobManager.path.address.host.isDefined) {
    +      //create new Thread to upload log, to not block the TM for too long
    +      new Thread() {
    +        override def run(): Unit = {
    --- End diff --
    
    Should be system error to where it is written.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---