You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/08/13 09:57:38 UTC
[flink] 03/03: [FLINK-10006][network] improve logging in
BarrierBuffer: prepend owning task name
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d5ddb0fbdc41fa8230cff7c54b993e62ddef2447
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Tue Jul 31 14:48:58 2018 +0200
[FLINK-10006][network] improve logging in BarrierBuffer: prepend owning task name
This closes #6470.
---
.../io/network/partition/consumer/InputGate.java | 2 +
.../partition/consumer/SingleInputGate.java | 5 ++
.../network/partition/consumer/UnionInputGate.java | 6 ++
.../flink/streaming/runtime/io/BarrierBuffer.java | 64 +++++++++++++++-------
.../runtime/io/BarrierBufferMassiveRandomTest.java | 12 ++++
.../flink/streaming/runtime/io/MockInputGate.java | 12 ++++
6 files changed, 81 insertions(+), 20 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 0413caa..c78abb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -69,6 +69,8 @@ public interface InputGate {
int getNumberOfInputChannels();
+ String getOwningTaskName();
+
boolean isFinished();
void requestPartitions() throws IOException, InterruptedException;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index dbef46f..2e7d076 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -274,6 +274,11 @@ public class SingleInputGate implements InputGate {
return 0;
}
+ @Override
+ public String getOwningTaskName() {
+ return owningTaskName;
+ }
+
// ------------------------------------------------------------------------
// Setup/Life-cycle
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 742592a..d3085cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -130,6 +130,12 @@ public class UnionInputGate implements InputGate, InputGateListener {
}
@Override
+ public String getOwningTaskName() {
+ // all input gates have the same owning task
+ return inputGates[0].getOwningTaskName();
+ }
+
+ @Override
public boolean isFinished() {
for (InputGate inputGate : inputGates) {
if (!inputGate.isFinished()) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 78852b8..991635a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -213,7 +213,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
private void completeBufferedSequence() throws IOException {
- LOG.debug("Finished feeding back buffered data");
+ LOG.debug("{}: Finished feeding back buffered data.", inputGate.getOwningTaskName());
currentBuffered.cleanup();
currentBuffered = queuedBuffered.pollFirst();
@@ -247,8 +247,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
else if (barrierId > currentCheckpointId) {
// we did not complete the current checkpoint, another started before
- LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
- "Skipping current checkpoint.", barrierId, currentCheckpointId);
+ LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
+ "Skipping current checkpoint.",
+ inputGate.getOwningTaskName(),
+ barrierId,
+ currentCheckpointId);
// let the task know we are not completing this
notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
@@ -279,8 +282,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
// actually trigger checkpoint
if (LOG.isDebugEnabled()) {
- LOG.debug("Received all barriers, triggering checkpoint {} at {}",
- receivedBarrier.getId(), receivedBarrier.getTimestamp());
+ LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
+ inputGate.getOwningTaskName(),
+ receivedBarrier.getId(),
+ receivedBarrier.getTimestamp());
}
releaseBlocksAndResetBarriers();
@@ -309,7 +314,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
if (barrierId == currentCheckpointId) {
// cancel this alignment
if (LOG.isDebugEnabled()) {
- LOG.debug("Checkpoint {} canceled, aborting alignment", barrierId);
+ LOG.debug("{}: Checkpoint {} canceled, aborting alignment.",
+ inputGate.getOwningTaskName(),
+ barrierId);
}
releaseBlocksAndResetBarriers();
@@ -317,8 +324,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
else if (barrierId > currentCheckpointId) {
// we canceled the next which also cancels the current
- LOG.warn("Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
- "Skipping current checkpoint.", barrierId, currentCheckpointId);
+ LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
+ "Skipping current checkpoint.",
+ inputGate.getOwningTaskName(),
+ barrierId,
+ currentCheckpointId);
// this stops the current alignment
releaseBlocksAndResetBarriers();
@@ -347,7 +357,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
latestAlignmentDurationNanos = 0L;
if (LOG.isDebugEnabled()) {
- LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId);
+ LOG.debug("{}: Checkpoint {} canceled, skipping alignment.",
+ inputGate.getOwningTaskName(),
+ barrierId);
}
notifyAbortOnCancellationBarrier(barrierId);
@@ -401,8 +413,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
private void checkSizeLimit() throws Exception {
if (maxBufferedBytes > 0 && (numQueuedBytes + bufferBlocker.getBytesBlocked()) > maxBufferedBytes) {
// exceeded our limit - abort this checkpoint
- LOG.info("Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded",
- currentCheckpointId, maxBufferedBytes);
+ LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.",
+ inputGate.getOwningTaskName(),
+ currentCheckpointId,
+ maxBufferedBytes);
releaseBlocksAndResetBarriers();
notifyAbort(currentCheckpointId, new AlignmentLimitExceededException(maxBufferedBytes));
@@ -444,7 +458,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
startOfAlignmentTimestamp = System.nanoTime();
if (LOG.isDebugEnabled()) {
- LOG.debug("Starting stream alignment for checkpoint " + checkpointId + '.');
+ LOG.debug("{}: Starting stream alignment for checkpoint {}.",
+ inputGate.getOwningTaskName(),
+ checkpointId);
}
}
@@ -470,7 +486,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
numBarriersReceived++;
if (LOG.isDebugEnabled()) {
- LOG.debug("Received barrier from channel " + channelIndex);
+ LOG.debug("{}: Received barrier from channel {}.",
+ inputGate.getOwningTaskName(),
+ channelIndex);
}
}
else {
@@ -483,7 +501,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
* Makes sure the just written data is the next to be consumed.
*/
private void releaseBlocksAndResetBarriers() throws IOException {
- LOG.debug("End of stream alignment, feeding buffered data back");
+ LOG.debug("{}: End of stream alignment, feeding buffered data back.",
+ inputGate.getOwningTaskName());
for (int i = 0; i < blockedChannels.length; i++) {
blockedChannels[i] = false;
@@ -499,8 +518,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
else {
// uncommon case: buffered data pending
// push back the pending data, if we have any
- LOG.debug("Checkpoint skipped via buffered data:" +
- "Pushing back current alignment buffers and feeding back new alignment data first.");
+ LOG.debug("{}: Checkpoint skipped via buffered data:" +
+ "Pushing back current alignment buffers and feeding back new alignment data first.",
+ inputGate.getOwningTaskName());
// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one
BufferOrEventSequence bufferedNow = bufferBlocker.rollOverWithoutReusingResources();
@@ -513,8 +533,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Size of buffered data: {} bytes",
- currentBuffered == null ? 0L : currentBuffered.size());
+ LOG.debug("{}: Size of buffered data: {} bytes",
+ inputGate.getOwningTaskName(),
+ currentBuffered == null ? 0L : currentBuffered.size());
}
// the next barrier that comes must assume it is the first
@@ -555,7 +576,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
@Override
public String toString() {
- return String.format("last checkpoint: %d, current barriers: %d, closed channels: %d",
- currentCheckpointId, numBarriersReceived, numClosedChannels);
+ return String.format("%s: last checkpoint: %d, current barriers: %d, closed channels: %d",
+ inputGate.getOwningTaskName(),
+ currentCheckpointId,
+ numBarriersReceived,
+ numClosedChannels);
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 6dd1e5e..e968101 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -139,11 +139,18 @@ public class BarrierBufferMassiveRandomTest {
private int currentChannel = 0;
private long c = 0;
+ private final String owningTaskName;
+
public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
+ this(bufferPools, barrierGens, "TestTask");
+ }
+
+ public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens, String owningTaskName) {
this.numChannels = bufferPools.length;
this.currentBarriers = new int[numChannels];
this.bufferPools = bufferPools;
this.barrierGens = barrierGens;
+ this.owningTaskName = owningTaskName;
}
@Override
@@ -152,6 +159,11 @@ public class BarrierBufferMassiveRandomTest {
}
@Override
+ public String getOwningTaskName() {
+ return owningTaskName;
+ }
+
+ @Override
public boolean isFinished() {
return false;
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index e62b709..6400a17 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -44,11 +44,18 @@ public class MockInputGate implements InputGate {
private int closedChannels;
+ private final String owningTaskName;
+
public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> bufferOrEvents) {
+ this(pageSize, numChannels, bufferOrEvents, "MockTask");
+ }
+
+ public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> bufferOrEvents, String owningTaskName) {
this.pageSize = pageSize;
this.numChannels = numChannels;
this.bufferOrEvents = new ArrayDeque<BufferOrEvent>(bufferOrEvents);
this.closed = new boolean[numChannels];
+ this.owningTaskName = owningTaskName;
}
@Override
@@ -62,6 +69,11 @@ public class MockInputGate implements InputGate {
}
@Override
+ public String getOwningTaskName() {
+ return owningTaskName;
+ }
+
+ @Override
public boolean isFinished() {
return bufferOrEvents.isEmpty();
}