You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/10 09:23:20 UTC

[flink] branch master updated: [FLINK-10131][network] improve logging around subpartitions

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f8d91f  [FLINK-10131][network] improve logging around subpartitions
5f8d91f is described below

commit 5f8d91faeb9b2ab8feb919c717935065b473546c
Author: Nico Kruber <ni...@gmail.com>
AuthorDate: Mon Sep 10 11:14:01 2018 +0200

    [FLINK-10131][network] improve logging around subpartitions
    
    - add task name
    - add subpartition index
    
    This closes #6547.
---
 .../runtime/io/network/partition/PipelinedSubpartition.java   | 11 ++++++-----
 .../flink/runtime/io/network/partition/ResultPartition.java   |  4 ++++
 .../runtime/io/network/partition/SpillableSubpartition.java   | 11 +++++++----
 3 files changed, 17 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index cc79363..c6f3e15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -80,7 +80,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 	@Override
 	public void finish() throws IOException {
 		add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
-		LOG.debug("Finished {}.", this);
+		LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
 	}
 
 	private boolean add(BufferConsumer bufferConsumer, boolean finish) {
@@ -132,7 +132,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 			isReleased = true;
 		}
 
-		LOG.debug("Released {}.", this);
+		LOG.debug("{}: Released {}.", parent.getOwningTaskName(), this);
 
 		if (view != null) {
 			view.releaseAllResources();
@@ -224,7 +224,8 @@ class PipelinedSubpartition extends ResultSubpartition {
 					"Subpartition %s of is being (or already has been) consumed, " +
 					"but pipelined subpartitions can only be consumed once.", index, parent.getPartitionId());
 
-			LOG.debug("Creating read view for subpartition {} of partition {}.", index, parent.getPartitionId());
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), index, parent.getPartitionId());
 
 			readView = new PipelinedSubpartitionView(this, availabilityListener);
 			if (!buffers.isEmpty()) {
@@ -268,8 +269,8 @@ class PipelinedSubpartition extends ResultSubpartition {
 		}
 
 		return String.format(
-			"PipelinedSubpartition [number of buffers: %d (%d bytes), number of buffers in backlog: %d, finished? %s, read view? %s]",
-			numBuffers, numBytes, getBuffersInBacklog(), finished, hasReadView);
+			"PipelinedSubpartition#%d [number of buffers: %d (%d bytes), number of buffers in backlog: %d, finished? %s, read view? %s]",
+			index, numBuffers, numBytes, getBuffersInBacklog(), finished, hasReadView);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index fbbfa4b..93e5ba1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -199,6 +199,10 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 		return jobId;
 	}
 
+	public String getOwningTaskName() {
+		return owningTaskName;
+	}
+
 	public ResultPartitionID getPartitionId() {
 		return partitionId;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 69b461b..9f696ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -143,6 +143,7 @@ class SpillableSubpartition extends ResultSubpartition {
 		if (spillWriter != null) {
 			spillWriter.close();
 		}
+		LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
 	}
 
 	@Override
@@ -180,6 +181,8 @@ class SpillableSubpartition extends ResultSubpartition {
 			isReleased = true;
 		}
 
+		LOG.debug("{}: Released {}.", parent.getOwningTaskName(), this);
+
 		if (view != null) {
 			view.releaseAllResources();
 		}
@@ -236,8 +239,8 @@ class SpillableSubpartition extends ResultSubpartition {
 				long spilledBytes = spillFinishedBufferConsumers(isFinished);
 				int spilledBuffers = numberOfBuffers - buffers.size();
 
-				LOG.debug("Spilling {} bytes ({} buffers} for sub partition {} of {}.",
-					spilledBytes, spilledBuffers, index, parent.getPartitionId());
+				LOG.debug("{}: Spilling {} bytes ({} buffers} for sub partition {} of {}.",
+					parent.getOwningTaskName(), spilledBytes, spilledBuffers, index, parent.getPartitionId());
 
 				return spilledBuffers;
 			}
@@ -300,9 +303,9 @@ class SpillableSubpartition extends ResultSubpartition {
 
 	@Override
 	public String toString() {
-		return String.format("SpillableSubpartition [%d number of buffers (%d bytes)," +
+		return String.format("SpillableSubpartition#%d [%d number of buffers (%d bytes)," +
 				"%d number of buffers in backlog, finished? %s, read view? %s, spilled? %s]",
-			getTotalNumberOfBuffers(), getTotalNumberOfBytes(),
+			index, getTotalNumberOfBuffers(), getTotalNumberOfBytes(),
 			getBuffersInBacklog(), isFinished, readView != null, spillWriter != null);
 	}