You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/10 09:23:20 UTC
[flink] branch master updated: [FLINK-10131][network] improve
logging around subpartitions
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5f8d91f [FLINK-10131][network] improve logging around subpartitions
5f8d91f is described below
commit 5f8d91faeb9b2ab8feb919c717935065b473546c
Author: Nico Kruber <ni...@gmail.com>
AuthorDate: Mon Sep 10 11:14:01 2018 +0200
[FLINK-10131][network] improve logging around subpartitions
- add task name
- add subpartition index
This closes #6547.
---
.../runtime/io/network/partition/PipelinedSubpartition.java | 11 ++++++-----
.../flink/runtime/io/network/partition/ResultPartition.java | 4 ++++
.../runtime/io/network/partition/SpillableSubpartition.java | 11 +++++++----
3 files changed, 17 insertions(+), 9 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index cc79363..c6f3e15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -80,7 +80,7 @@ class PipelinedSubpartition extends ResultSubpartition {
@Override
public void finish() throws IOException {
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
- LOG.debug("Finished {}.", this);
+ LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
}
private boolean add(BufferConsumer bufferConsumer, boolean finish) {
@@ -132,7 +132,7 @@ class PipelinedSubpartition extends ResultSubpartition {
isReleased = true;
}
- LOG.debug("Released {}.", this);
+ LOG.debug("{}: Released {}.", parent.getOwningTaskName(), this);
if (view != null) {
view.releaseAllResources();
@@ -224,7 +224,8 @@ class PipelinedSubpartition extends ResultSubpartition {
"Subpartition %s of is being (or already has been) consumed, " +
"but pipelined subpartitions can only be consumed once.", index, parent.getPartitionId());
- LOG.debug("Creating read view for subpartition {} of partition {}.", index, parent.getPartitionId());
+ LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+ parent.getOwningTaskName(), index, parent.getPartitionId());
readView = new PipelinedSubpartitionView(this, availabilityListener);
if (!buffers.isEmpty()) {
@@ -268,8 +269,8 @@ class PipelinedSubpartition extends ResultSubpartition {
}
return String.format(
- "PipelinedSubpartition [number of buffers: %d (%d bytes), number of buffers in backlog: %d, finished? %s, read view? %s]",
- numBuffers, numBytes, getBuffersInBacklog(), finished, hasReadView);
+ "PipelinedSubpartition#%d [number of buffers: %d (%d bytes), number of buffers in backlog: %d, finished? %s, read view? %s]",
+ index, numBuffers, numBytes, getBuffersInBacklog(), finished, hasReadView);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index fbbfa4b..93e5ba1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -199,6 +199,10 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
return jobId;
}
+ public String getOwningTaskName() {
+ return owningTaskName;
+ }
+
public ResultPartitionID getPartitionId() {
return partitionId;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 69b461b..9f696ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -143,6 +143,7 @@ class SpillableSubpartition extends ResultSubpartition {
if (spillWriter != null) {
spillWriter.close();
}
+ LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
}
@Override
@@ -180,6 +181,8 @@ class SpillableSubpartition extends ResultSubpartition {
isReleased = true;
}
+ LOG.debug("{}: Released {}.", parent.getOwningTaskName(), this);
+
if (view != null) {
view.releaseAllResources();
}
@@ -236,8 +239,8 @@ class SpillableSubpartition extends ResultSubpartition {
long spilledBytes = spillFinishedBufferConsumers(isFinished);
int spilledBuffers = numberOfBuffers - buffers.size();
- LOG.debug("Spilling {} bytes ({} buffers} for sub partition {} of {}.",
- spilledBytes, spilledBuffers, index, parent.getPartitionId());
+ LOG.debug("{}: Spilling {} bytes ({} buffers} for sub partition {} of {}.",
+ parent.getOwningTaskName(), spilledBytes, spilledBuffers, index, parent.getPartitionId());
return spilledBuffers;
}
@@ -300,9 +303,9 @@ class SpillableSubpartition extends ResultSubpartition {
@Override
public String toString() {
- return String.format("SpillableSubpartition [%d number of buffers (%d bytes)," +
+ return String.format("SpillableSubpartition#%d [%d number of buffers (%d bytes)," +
"%d number of buffers in backlog, finished? %s, read view? %s, spilled? %s]",
- getTotalNumberOfBuffers(), getTotalNumberOfBytes(),
+ index, getTotalNumberOfBuffers(), getTotalNumberOfBytes(),
getBuffersInBacklog(), isFinished, readView != null, spillWriter != null);
}