You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/01 17:01:25 UTC

[flink] 01/07: [FLINK-21104][network] Adding task name to logged network buffers

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d4da833ab133ac1248bcf31acd5a5f8e5a6a3897
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Thu Jan 28 17:10:24 2021 +0100

    [FLINK-21104][network] Adding task name to logged network buffers
---
 .../channel/RecoveredChannelStateHandler.java      |  2 --
 .../io/network/logger/NetworkActionsLogger.java    | 23 +++++++++++++++++-----
 .../network/partition/PipelinedSubpartition.java   |  5 ++++-
 .../partition/consumer/LocalInputChannel.java      |  6 ++++--
 .../partition/consumer/RecoveredInputChannel.java  |  6 ++++++
 .../partition/consumer/RemoteInputChannel.java     | 11 +++++++++++
 6 files changed, 43 insertions(+), 10 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
index eb72ac7..a307621 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
@@ -67,8 +67,6 @@ class InputChannelRecoveredStateHandler
     @Override
     public void recover(InputChannelInfo channelInfo, Buffer buffer) {
         if (buffer.readableBytes() > 0) {
-            NetworkActionsLogger.traceRecover(
-                    "InputChannelRecoveredStateHandler#recover", buffer, channelInfo);
             getChannel(channelInfo).onRecoveredStateBuffer(buffer);
         } else {
             buffer.recycleBuffer();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/logger/NetworkActionsLogger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/logger/NetworkActionsLogger.java
index 4ecf18c..531a245 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/logger/NetworkActionsLogger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/logger/NetworkActionsLogger.java
@@ -40,12 +40,14 @@ public class NetworkActionsLogger {
     public static void traceInput(
             String action,
             Buffer buffer,
+            String taskName,
             InputChannelInfo channelInfo,
             ChannelStatePersister channelStatePersister,
             int sequenceNumber) {
         if (ENABLED) {
             LOG.trace(
-                    "{} {}, seq {}, {} @ {}",
+                    "[{}] {} {}, seq {}, {} @ {}",
+                    taskName,
                     action,
                     buffer.toDebugString(INCLUDE_HASH),
                     sequenceNumber,
@@ -55,15 +57,26 @@ public class NetworkActionsLogger {
     }
 
     public static void traceOutput(
-            String action, Buffer buffer, ResultSubpartitionInfo channelInfo) {
+            String action, Buffer buffer, String taskName, ResultSubpartitionInfo channelInfo) {
         if (ENABLED) {
-            LOG.trace("{} {} @ {}", action, buffer.toDebugString(INCLUDE_HASH), channelInfo);
+            LOG.trace(
+                    "[{}] {} {} @ {}",
+                    taskName,
+                    action,
+                    buffer.toDebugString(INCLUDE_HASH),
+                    channelInfo);
         }
     }
 
-    public static void traceRecover(String action, Buffer buffer, InputChannelInfo channelInfo) {
+    public static void traceRecover(
+            String action, Buffer buffer, String taskName, InputChannelInfo channelInfo) {
         if (ENABLED) {
-            LOG.trace("{} {} @ {}", action, buffer.toDebugString(INCLUDE_HASH), channelInfo);
+            LOG.trace(
+                    "[{}] {} {} @ {}",
+                    taskName,
+                    action,
+                    buffer.toDebugString(INCLUDE_HASH),
+                    channelInfo);
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 422c832..01fbdd6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -325,7 +325,10 @@ public class PipelinedSubpartition extends ResultSubpartition
             // queue
             // will be 2 or more.
             NetworkActionsLogger.traceOutput(
-                    "PipelinedSubpartition#pollBuffer", buffer, subpartitionInfo);
+                    "PipelinedSubpartition#pollBuffer",
+                    buffer,
+                    parent.getOwningTaskName(),
+                    subpartitionInfo);
             return new BufferAndBacklog(
                     buffer,
                     getBuffersInBacklog(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 21f415b..fecfff9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -143,10 +143,11 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 
             if (subpartitionView == null) {
                 LOG.debug(
-                        "{}: Requesting LOCAL subpartition {} of partition {}.",
+                        "{}: Requesting LOCAL subpartition {} of partition {}. {}",
                         this,
                         subpartitionIndex,
-                        partitionId);
+                        partitionId,
+                        channelStatePersister);
 
                 try {
                     ResultSubpartitionView subpartitionView =
@@ -262,6 +263,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
         NetworkActionsLogger.traceInput(
                 "LocalInputChannel#getNextBuffer",
                 buffer,
+                inputGate.getOwningTaskName(),
                 channelInfo,
                 channelStatePersister,
                 next.getSequenceNumber());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
index a70e74d..d163221 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
 import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.util.Preconditions;
@@ -120,6 +121,11 @@ public abstract class RecoveredInputChannel extends InputChannel implements Chan
 
     public void onRecoveredStateBuffer(Buffer buffer) {
         boolean recycleBuffer = true;
+        NetworkActionsLogger.traceRecover(
+                "InputChannelRecoveredStateHandler#recover",
+                buffer,
+                inputGate.getOwningTaskName(),
+                channelInfo);
         try {
             final boolean wasEmpty;
             synchronized (receivedBuffers) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 3090f47..ac437ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -42,6 +42,9 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
@@ -60,6 +63,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 /** An input channel, which requests a remote partition queue. */
 public class RemoteInputChannel extends InputChannel {
+    private static final Logger LOG = LoggerFactory.getLogger(RemoteInputChannel.class);
 
     private static final int NONE = -1;
 
@@ -163,6 +167,12 @@ public class RemoteInputChannel extends InputChannel {
     public void requestSubpartition(int subpartitionIndex)
             throws IOException, InterruptedException {
         if (partitionRequestClient == null) {
+            LOG.debug(
+                    "{}: Requesting REMOTE subpartition {} of partition {}. {}",
+                    this,
+                    subpartitionIndex,
+                    partitionId,
+                    channelStatePersister);
             // Create a client and request the partition
             try {
                 partitionRequestClient =
@@ -448,6 +458,7 @@ public class RemoteInputChannel extends InputChannel {
                 NetworkActionsLogger.traceInput(
                         "RemoteInputChannel#onBuffer",
                         buffer,
+                        inputGate.getOwningTaskName(),
                         channelInfo,
                         channelStatePersister,
                         sequenceNumber);