You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/08/13 09:57:38 UTC

[flink] 03/03: [FLINK-10006][network] improve logging in BarrierBuffer: prepend owning task name

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d5ddb0fbdc41fa8230cff7c54b993e62ddef2447
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Tue Jul 31 14:48:58 2018 +0200

    [FLINK-10006][network] improve logging in BarrierBuffer: prepend owning task name
    
    This closes #6470.
---
 .../io/network/partition/consumer/InputGate.java   |  2 +
 .../partition/consumer/SingleInputGate.java        |  5 ++
 .../network/partition/consumer/UnionInputGate.java |  6 ++
 .../flink/streaming/runtime/io/BarrierBuffer.java  | 64 +++++++++++++++-------
 .../runtime/io/BarrierBufferMassiveRandomTest.java | 12 ++++
 .../flink/streaming/runtime/io/MockInputGate.java  | 12 ++++
 6 files changed, 81 insertions(+), 20 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 0413caa..c78abb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -69,6 +69,8 @@ public interface InputGate {
 
 	int getNumberOfInputChannels();
 
+	String getOwningTaskName();
+
 	boolean isFinished();
 
 	void requestPartitions() throws IOException, InterruptedException;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index dbef46f..2e7d076 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -274,6 +274,11 @@ public class SingleInputGate implements InputGate {
 		return 0;
 	}
 
+	@Override
+	public String getOwningTaskName() {
+		return owningTaskName;
+	}
+
 	// ------------------------------------------------------------------------
 	// Setup/Life-cycle
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 742592a..d3085cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -130,6 +130,12 @@ public class UnionInputGate implements InputGate, InputGateListener {
 	}
 
 	@Override
+	public String getOwningTaskName() {
+		// all input gates have the same owning task
+		return inputGates[0].getOwningTaskName();
+	}
+
+	@Override
 	public boolean isFinished() {
 		for (InputGate inputGate : inputGates) {
 			if (!inputGate.isFinished()) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 78852b8..991635a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -213,7 +213,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	}
 
 	private void completeBufferedSequence() throws IOException {
-		LOG.debug("Finished feeding back buffered data");
+		LOG.debug("{}: Finished feeding back buffered data.", inputGate.getOwningTaskName());
 
 		currentBuffered.cleanup();
 		currentBuffered = queuedBuffered.pollFirst();
@@ -247,8 +247,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			}
 			else if (barrierId > currentCheckpointId) {
 				// we did not complete the current checkpoint, another started before
-				LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
-						"Skipping current checkpoint.", barrierId, currentCheckpointId);
+				LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
+						"Skipping current checkpoint.",
+					inputGate.getOwningTaskName(),
+					barrierId,
+					currentCheckpointId);
 
 				// let the task know we are not completing this
 				notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
@@ -279,8 +282,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
 			// actually trigger checkpoint
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Received all barriers, triggering checkpoint {} at {}",
-						receivedBarrier.getId(), receivedBarrier.getTimestamp());
+				LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
+					inputGate.getOwningTaskName(),
+					receivedBarrier.getId(),
+					receivedBarrier.getTimestamp());
 			}
 
 			releaseBlocksAndResetBarriers();
@@ -309,7 +314,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			if (barrierId == currentCheckpointId) {
 				// cancel this alignment
 				if (LOG.isDebugEnabled()) {
-					LOG.debug("Checkpoint {} canceled, aborting alignment", barrierId);
+					LOG.debug("{}: Checkpoint {} canceled, aborting alignment.",
+						inputGate.getOwningTaskName(),
+						barrierId);
 				}
 
 				releaseBlocksAndResetBarriers();
@@ -317,8 +324,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			}
 			else if (barrierId > currentCheckpointId) {
 				// we canceled the next which also cancels the current
-				LOG.warn("Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
-						"Skipping current checkpoint.", barrierId, currentCheckpointId);
+				LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
+						"Skipping current checkpoint.",
+					inputGate.getOwningTaskName(),
+					barrierId,
+					currentCheckpointId);
 
 				// this stops the current alignment
 				releaseBlocksAndResetBarriers();
@@ -347,7 +357,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			latestAlignmentDurationNanos = 0L;
 
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId);
+				LOG.debug("{}: Checkpoint {} canceled, skipping alignment.",
+					inputGate.getOwningTaskName(),
+					barrierId);
 			}
 
 			notifyAbortOnCancellationBarrier(barrierId);
@@ -401,8 +413,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	private void checkSizeLimit() throws Exception {
 		if (maxBufferedBytes > 0 && (numQueuedBytes + bufferBlocker.getBytesBlocked()) > maxBufferedBytes) {
 			// exceeded our limit - abort this checkpoint
-			LOG.info("Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded",
-					currentCheckpointId, maxBufferedBytes);
+			LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.",
+				inputGate.getOwningTaskName(),
+				currentCheckpointId,
+				maxBufferedBytes);
 
 			releaseBlocksAndResetBarriers();
 			notifyAbort(currentCheckpointId, new AlignmentLimitExceededException(maxBufferedBytes));
@@ -444,7 +458,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		startOfAlignmentTimestamp = System.nanoTime();
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Starting stream alignment for checkpoint " + checkpointId + '.');
+			LOG.debug("{}: Starting stream alignment for checkpoint {}.",
+				inputGate.getOwningTaskName(),
+				checkpointId);
 		}
 	}
 
@@ -470,7 +486,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			numBarriersReceived++;
 
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Received barrier from channel " + channelIndex);
+				LOG.debug("{}: Received barrier from channel {}.",
+					inputGate.getOwningTaskName(),
+					channelIndex);
 			}
 		}
 		else {
@@ -483,7 +501,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	 * Makes sure the just written data is the next to be consumed.
 	 */
 	private void releaseBlocksAndResetBarriers() throws IOException {
-		LOG.debug("End of stream alignment, feeding buffered data back");
+		LOG.debug("{}: End of stream alignment, feeding buffered data back.",
+			inputGate.getOwningTaskName());
 
 		for (int i = 0; i < blockedChannels.length; i++) {
 			blockedChannels[i] = false;
@@ -499,8 +518,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		else {
 			// uncommon case: buffered data pending
 			// push back the pending data, if we have any
-			LOG.debug("Checkpoint skipped via buffered data:" +
-					"Pushing back current alignment buffers and feeding back new alignment data first.");
+			LOG.debug("{}: Checkpoint skipped via buffered data:" +
+					"Pushing back current alignment buffers and feeding back new alignment data first.",
+				inputGate.getOwningTaskName());
 
 			// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one
 			BufferOrEventSequence bufferedNow = bufferBlocker.rollOverWithoutReusingResources();
@@ -513,8 +533,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		}
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Size of buffered data: {} bytes",
-					currentBuffered == null ? 0L : currentBuffered.size());
+			LOG.debug("{}: Size of buffered data: {} bytes",
+				inputGate.getOwningTaskName(),
+				currentBuffered == null ? 0L : currentBuffered.size());
 		}
 
 		// the next barrier that comes must assume it is the first
@@ -555,7 +576,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 
 	@Override
 	public String toString() {
-		return String.format("last checkpoint: %d, current barriers: %d, closed channels: %d",
-				currentCheckpointId, numBarriersReceived, numClosedChannels);
+		return String.format("%s: last checkpoint: %d, current barriers: %d, closed channels: %d",
+			inputGate.getOwningTaskName(),
+			currentCheckpointId,
+			numBarriersReceived,
+			numClosedChannels);
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 6dd1e5e..e968101 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -139,11 +139,18 @@ public class BarrierBufferMassiveRandomTest {
 		private int currentChannel = 0;
 		private long c = 0;
 
+		private final String owningTaskName;
+
 		public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
+			this(bufferPools, barrierGens, "TestTask");
+		}
+
+		public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens, String owningTaskName) {
 			this.numChannels = bufferPools.length;
 			this.currentBarriers = new int[numChannels];
 			this.bufferPools = bufferPools;
 			this.barrierGens = barrierGens;
+			this.owningTaskName = owningTaskName;
 		}
 
 		@Override
@@ -152,6 +159,11 @@ public class BarrierBufferMassiveRandomTest {
 		}
 
 		@Override
+		public String getOwningTaskName() {
+			return owningTaskName;
+		}
+
+		@Override
 		public boolean isFinished() {
 			return false;
 		}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index e62b709..6400a17 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -44,11 +44,18 @@ public class MockInputGate implements InputGate {
 
 	private int closedChannels;
 
+	private final String owningTaskName;
+
 	public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> bufferOrEvents) {
+		this(pageSize, numChannels, bufferOrEvents, "MockTask");
+	}
+
+	public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> bufferOrEvents, String owningTaskName) {
 		this.pageSize = pageSize;
 		this.numChannels = numChannels;
 		this.bufferOrEvents = new ArrayDeque<BufferOrEvent>(bufferOrEvents);
 		this.closed = new boolean[numChannels];
+		this.owningTaskName = owningTaskName;
 	}
 
 	@Override
@@ -62,6 +69,11 @@ public class MockInputGate implements InputGate {
 	}
 
 	@Override
+	public String getOwningTaskName() {
+		return owningTaskName;
+	}
+
+	@Override
 	public boolean isFinished() {
 		return bufferOrEvents.isEmpty();
 	}