You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/17 16:42:30 UTC
flink git commit: [hotfix] [checkpoints] Enhance debug logging in
exactly-once and at-least-once checkpoint stream aligners
Repository: flink
Updated Branches:
refs/heads/master 836fe9786 -> 539880994
[hotfix] [checkpoints] Enhance debug logging in exactly-once and at-least-once checkpoint stream aligners
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53988099
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53988099
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53988099
Branch: refs/heads/master
Commit: 5398809944c2cfbbfe628a5d64cff4558375d29e
Parents: 836fe97
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 17 17:34:09 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 17 17:34:09 2016 +0100
----------------------------------------------------------------------
.../streaming/runtime/io/BarrierBuffer.java | 11 ++++++-
.../streaming/runtime/io/BarrierTracker.java | 31 ++++++++++++++++----
2 files changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/53988099/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 0ad9905..0940133 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -199,6 +199,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
private void completeBufferedSequence() throws IOException {
+ LOG.debug("Finished feeding back buffered data");
+
currentBuffered.cleanup();
currentBuffered = queuedBuffered.pollFirst();
if (currentBuffered != null) {
@@ -475,7 +477,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
else {
// uncommon case: buffered data pending
// push back the pending data, if we have any
-
+ LOG.debug("Checkpoint skipped via buffered data:" +
+ "Pushing back current alignment buffers and feeding back new alignment data first.");
+
// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one
BufferSpiller.SpilledBufferOrEventSequence bufferedNow = bufferSpiller.rollOverWithNewBuffer();
if (bufferedNow != null) {
@@ -486,6 +490,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Size of buffered data: {} bytes",
+ currentBuffered == null ? 0L : currentBuffered.size());
+ }
+
// the next barrier that comes must assume it is the first
numBarriersReceived = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/53988099/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 72838bc..f497f4b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -27,6 +27,9 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayDeque;
/**
@@ -43,10 +46,14 @@ import java.util.ArrayDeque;
@Internal
public class BarrierTracker implements CheckpointBarrierHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(BarrierTracker.class);
+
/** The tracker tracks a maximum number of checkpoints, for which some, but not all
* barriers have yet arrived. */
private static final int MAX_CHECKPOINTS_TO_TRACK = 50;
-
+
+ // ------------------------------------------------------------------------
+
/** The input gate, to draw the buffers and events from */
private final InputGate inputGate;
@@ -81,10 +88,10 @@ public class BarrierTracker implements CheckpointBarrierHandler {
return next;
}
else if (next.getEvent().getClass() == CheckpointBarrier.class) {
- processBarrier((CheckpointBarrier) next.getEvent());
+ processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
}
else if (next.getEvent().getClass() == CancelCheckpointMarker.class) {
- processCheckpointAbortBarrier((CancelCheckpointMarker) next.getEvent());
+ processCheckpointAbortBarrier((CancelCheckpointMarker) next.getEvent(), next.getChannelIndex());
}
else {
// some other event
@@ -119,7 +126,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
return 0L;
}
- private void processBarrier(CheckpointBarrier receivedBarrier) throws Exception {
+ private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
final long barrierId = receivedBarrier.getId();
// fast path for single channel trackers
@@ -129,8 +136,11 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
// general path for multiple input channels
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex);
+ }
- // find the checkpoint barrier in the queue of bending barriers
+ // find the checkpoint barrier in the queue of pending barriers
CheckpointBarrierCount cbc = null;
int pos = 0;
@@ -155,6 +165,10 @@ public class BarrierTracker implements CheckpointBarrierHandler {
// notify the listener
if (!cbc.isAborted()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received all barriers for checkpoint {}", barrierId);
+ }
+
notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp());
}
}
@@ -176,9 +190,14 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
}
- private void processCheckpointAbortBarrier(CancelCheckpointMarker barrier) throws Exception {
+
+ private void processCheckpointAbortBarrier(CancelCheckpointMarker barrier, int channelIndex) throws Exception {
final long checkpointId = barrier.getCheckpointId();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received cancellation barrier for checkpoint {} from channel {}", checkpointId, channelIndex);
+ }
+
// fast path for single channel trackers
if (totalNumberOfInputChannels == 1) {
notifyAbort(checkpointId);