You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/10 09:14:03 UTC

[GitHub] NicoK closed pull request #6547: [FLINK-10131][network] improve logging around subpartitions

NicoK closed pull request #6547: [FLINK-10131][network] improve logging around subpartitions
URL: https://github.com/apache/flink/pull/6547
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index cc793635047..c6f3e158519 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -80,7 +80,7 @@ public void flush() {
 	@Override
 	public void finish() throws IOException {
 		add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
-		LOG.debug("Finished {}.", this);
+		LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
 	}
 
 	private boolean add(BufferConsumer bufferConsumer, boolean finish) {
@@ -132,7 +132,7 @@ public void release() {
 			isReleased = true;
 		}
 
-		LOG.debug("Released {}.", this);
+		LOG.debug("{}: Released {}.", parent.getOwningTaskName(), this);
 
 		if (view != null) {
 			view.releaseAllResources();
@@ -224,7 +224,8 @@ public PipelinedSubpartitionView createReadView(BufferAvailabilityListener avail
 					"Subpartition %s of is being (or already has been) consumed, " +
 					"but pipelined subpartitions can only be consumed once.", index, parent.getPartitionId());
 
-			LOG.debug("Creating read view for subpartition {} of partition {}.", index, parent.getPartitionId());
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), index, parent.getPartitionId());
 
 			readView = new PipelinedSubpartitionView(this, availabilityListener);
 			if (!buffers.isEmpty()) {
@@ -268,8 +269,8 @@ public String toString() {
 		}
 
 		return String.format(
-			"PipelinedSubpartition [number of buffers: %d (%d bytes), number of buffers in backlog: %d, finished? %s, read view? %s]",
-			numBuffers, numBytes, getBuffersInBacklog(), finished, hasReadView);
+			"PipelinedSubpartition#%d [number of buffers: %d (%d bytes), number of buffers in backlog: %d, finished? %s, read view? %s]",
+			index, numBuffers, numBytes, getBuffersInBacklog(), finished, hasReadView);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index fbbfa4b45bb..93e5ba15097 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -199,6 +199,10 @@ public JobID getJobId() {
 		return jobId;
 	}
 
+	public String getOwningTaskName() {
+		return owningTaskName;
+	}
+
 	public ResultPartitionID getPartitionId() {
 		return partitionId;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 69b461b1a4d..9f696adc362 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -143,6 +143,7 @@ public synchronized void finish() throws IOException {
 		if (spillWriter != null) {
 			spillWriter.close();
 		}
+		LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
 	}
 
 	@Override
@@ -180,6 +181,8 @@ public synchronized void release() throws IOException {
 			isReleased = true;
 		}
 
+		LOG.debug("{}: Released {}.", parent.getOwningTaskName(), this);
+
 		if (view != null) {
 			view.releaseAllResources();
 		}
@@ -236,8 +239,8 @@ public int releaseMemory() throws IOException {
 				long spilledBytes = spillFinishedBufferConsumers(isFinished);
 				int spilledBuffers = numberOfBuffers - buffers.size();
 
-				LOG.debug("Spilling {} bytes ({} buffers} for sub partition {} of {}.",
-					spilledBytes, spilledBuffers, index, parent.getPartitionId());
+				LOG.debug("{}: Spilling {} bytes ({} buffers} for sub partition {} of {}.",
+					parent.getOwningTaskName(), spilledBytes, spilledBuffers, index, parent.getPartitionId());
 
 				return spilledBuffers;
 			}
@@ -300,9 +303,9 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
 
 	@Override
 	public String toString() {
-		return String.format("SpillableSubpartition [%d number of buffers (%d bytes)," +
+		return String.format("SpillableSubpartition#%d [%d number of buffers (%d bytes)," +
 				"%d number of buffers in backlog, finished? %s, read view? %s, spilled? %s]",
-			getTotalNumberOfBuffers(), getTotalNumberOfBytes(),
+			index, getTotalNumberOfBuffers(), getTotalNumberOfBytes(),
 			getBuffersInBacklog(), isFinished, readView != null, spillWriter != null);
 	}
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services