You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/01 17:01:25 UTC
[flink] 01/07: [FLINK-21104][network] Adding task name to logged
network buffers
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d4da833ab133ac1248bcf31acd5a5f8e5a6a3897
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Thu Jan 28 17:10:24 2021 +0100
[FLINK-21104][network] Adding task name to logged network buffers
---
.../channel/RecoveredChannelStateHandler.java | 2 --
.../io/network/logger/NetworkActionsLogger.java | 23 +++++++++++++++++-----
.../network/partition/PipelinedSubpartition.java | 5 ++++-
.../partition/consumer/LocalInputChannel.java | 6 ++++--
.../partition/consumer/RecoveredInputChannel.java | 6 ++++++
.../partition/consumer/RemoteInputChannel.java | 11 +++++++++++
6 files changed, 43 insertions(+), 10 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
index eb72ac7..a307621 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
@@ -67,8 +67,6 @@ class InputChannelRecoveredStateHandler
@Override
public void recover(InputChannelInfo channelInfo, Buffer buffer) {
if (buffer.readableBytes() > 0) {
- NetworkActionsLogger.traceRecover(
- "InputChannelRecoveredStateHandler#recover", buffer, channelInfo);
getChannel(channelInfo).onRecoveredStateBuffer(buffer);
} else {
buffer.recycleBuffer();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/logger/NetworkActionsLogger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/logger/NetworkActionsLogger.java
index 4ecf18c..531a245 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/logger/NetworkActionsLogger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/logger/NetworkActionsLogger.java
@@ -40,12 +40,14 @@ public class NetworkActionsLogger {
public static void traceInput(
String action,
Buffer buffer,
+ String taskName,
InputChannelInfo channelInfo,
ChannelStatePersister channelStatePersister,
int sequenceNumber) {
if (ENABLED) {
LOG.trace(
- "{} {}, seq {}, {} @ {}",
+ "[{}] {} {}, seq {}, {} @ {}",
+ taskName,
action,
buffer.toDebugString(INCLUDE_HASH),
sequenceNumber,
@@ -55,15 +57,26 @@ public class NetworkActionsLogger {
}
public static void traceOutput(
- String action, Buffer buffer, ResultSubpartitionInfo channelInfo) {
+ String action, Buffer buffer, String taskName, ResultSubpartitionInfo channelInfo) {
if (ENABLED) {
- LOG.trace("{} {} @ {}", action, buffer.toDebugString(INCLUDE_HASH), channelInfo);
+ LOG.trace(
+ "[{}] {} {} @ {}",
+ taskName,
+ action,
+ buffer.toDebugString(INCLUDE_HASH),
+ channelInfo);
}
}
- public static void traceRecover(String action, Buffer buffer, InputChannelInfo channelInfo) {
+ public static void traceRecover(
+ String action, Buffer buffer, String taskName, InputChannelInfo channelInfo) {
if (ENABLED) {
- LOG.trace("{} {} @ {}", action, buffer.toDebugString(INCLUDE_HASH), channelInfo);
+ LOG.trace(
+ "[{}] {} {} @ {}",
+ taskName,
+ action,
+ buffer.toDebugString(INCLUDE_HASH),
+ channelInfo);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 422c832..01fbdd6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -325,7 +325,10 @@ public class PipelinedSubpartition extends ResultSubpartition
// queue
// will be 2 or more.
NetworkActionsLogger.traceOutput(
- "PipelinedSubpartition#pollBuffer", buffer, subpartitionInfo);
+ "PipelinedSubpartition#pollBuffer",
+ buffer,
+ parent.getOwningTaskName(),
+ subpartitionInfo);
return new BufferAndBacklog(
buffer,
getBuffersInBacklog(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 21f415b..fecfff9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -143,10 +143,11 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
if (subpartitionView == null) {
LOG.debug(
- "{}: Requesting LOCAL subpartition {} of partition {}.",
+ "{}: Requesting LOCAL subpartition {} of partition {}. {}",
this,
subpartitionIndex,
- partitionId);
+ partitionId,
+ channelStatePersister);
try {
ResultSubpartitionView subpartitionView =
@@ -262,6 +263,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
NetworkActionsLogger.traceInput(
"LocalInputChannel#getNextBuffer",
buffer,
+ inputGate.getOwningTaskName(),
channelInfo,
channelStatePersister,
next.getSequenceNumber());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
index a70e74d..d163221 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.util.Preconditions;
@@ -120,6 +121,11 @@ public abstract class RecoveredInputChannel extends InputChannel implements Chan
public void onRecoveredStateBuffer(Buffer buffer) {
boolean recycleBuffer = true;
+ NetworkActionsLogger.traceRecover(
+ "InputChannelRecoveredStateHandler#recover",
+ buffer,
+ inputGate.getOwningTaskName(),
+ channelInfo);
try {
final boolean wasEmpty;
synchronized (receivedBuffers) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 3090f47..ac437ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -42,6 +42,9 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
@@ -60,6 +63,7 @@ import static org.apache.flink.util.Preconditions.checkState;
/** An input channel, which requests a remote partition queue. */
public class RemoteInputChannel extends InputChannel {
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteInputChannel.class);
private static final int NONE = -1;
@@ -163,6 +167,12 @@ public class RemoteInputChannel extends InputChannel {
public void requestSubpartition(int subpartitionIndex)
throws IOException, InterruptedException {
if (partitionRequestClient == null) {
+ LOG.debug(
+ "{}: Requesting REMOTE subpartition {} of partition {}. {}",
+ this,
+ subpartitionIndex,
+ partitionId,
+ channelStatePersister);
// Create a client and request the partition
try {
partitionRequestClient =
@@ -448,6 +458,7 @@ public class RemoteInputChannel extends InputChannel {
NetworkActionsLogger.traceInput(
"RemoteInputChannel#onBuffer",
buffer,
+ inputGate.getOwningTaskName(),
channelInfo,
channelStatePersister,
sequenceNumber);