You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chesnay Schepler (Jira)" <ji...@apache.org> on 2021/01/11 14:26:00 UTC

[jira] [Commented] (FLINK-17969) Enhance Flink (Task) logging to include job name as context diagnostic information

    [ https://issues.apache.org/jira/browse/FLINK-17969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17262671#comment-17262671 ] 

Chesnay Schepler commented on FLINK-17969:
------------------------------------------

I have closed the PR due to inactivity and because we have to look more into how the MDC could work on the RPC side of things and thread pools as a whole, as well as the overall performance impact.
The proposal doesn't work correctly for shared thread pools that re-use threads, as it would leak the MDC across jobs.


For our RPC actors it seems possible to adjust the MDC within {{AkkaRpcActor#handleMessage}} with information that is constant over the lifetime of an actor (like the job ID for a JobMaster). In addition we have to wrap certain operations like constructors manually where necessary.
For something like the ResourceManager we'd have to wrap operations based on the given job ID, if applicable.

The main task thread can be covered like in the proposed PR.

Thread pools are where things get ugly; we can't tie things to the creation of threads since they may be re-used, so the MDC adjustment have to be encoded into the runnable. This implies that we either have to adjust all runnables to do this, or wrap all {{Executor[Services]}} and have them do the wrapping. This also means we'll copy 2 maps for every single runnable.

We could think about excluding such thread pools from this effort, but I would nigh guarantee that users will run into issues where an operation from such a thread pool causes issues but they didn't see it because they only filtered for a specific job.


Overall I feel like this would be quite a major effort to implement it consistently across all components, and I'm not sure how much value we'd get with a partial implementation.

> Enhance Flink (Task) logging to include job name as context diagnostic information
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-17969
>                 URL: https://issues.apache.org/jira/browse/FLINK-17969
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>    Affects Versions: 1.10.0
>            Reporter: Bhagavan
>            Assignee: Chesnay Schepler
>            Priority: Trivial
>              Labels: pull-request-available
>
> Problem statement:
> We use a shared session cluster (Standalone/Yarn) to execute jobs. All logs from the cluster are shipped using log aggregation framework (Logstash/Splunk) so that application diagnostic is easier.
> However, we are missing one vital information in the logline. i.e. Job name so that we can filter the logs for a single job.
> Background
> Currently, Flink logging uses SLF4J as API to abstract away from concrete logging implementation (log4j 1.x, Logback or log4j2) and configuration of logging pattern and implementation can be configured at deployment, However, there is no MDC info from framework indicating job context.
> Proposed improvement.
> Add jobName field to Task class so that we can add it as MDC when task thread starts executing.
> Change is trivial and uses SLF4J MDC API.
> With this change, user can customise logging pattern to include MDC (e.g. in Logback %X{jobName} )
> Change required.
> {code:java}
> @@ -319,6 +323,7 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
>  
>                 this.jobId = jobInformation.getJobId();
> +               this.jobName = jobInformation.getJobName();
>                 this.vertexId = taskInformation.getJobVertexId();
> @@ -530,8 +535,10 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
>         @Override
>         public void run() {
>                 try {
> +                       MDC.put("jobName", this.jobName);
>                         doRun();
>                 } finally {
> +                       MDC.remove("jobName");
>                         terminationFuture.complete(executionState);
>                 }
>         }
> {code}
> if we are in agreement for this small change. Will raise PR.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)