You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/17 16:42:30 UTC

flink git commit: [hotfix] [checkpoints] Enhance debug logging in exactly-once and at-least-once checkpoint stream aligners

Repository: flink
Updated Branches:
  refs/heads/master 836fe9786 -> 539880994


[hotfix] [checkpoints] Enhance debug logging in exactly-once and at-least-once checkpoint stream aligners


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53988099
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53988099
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53988099

Branch: refs/heads/master
Commit: 5398809944c2cfbbfe628a5d64cff4558375d29e
Parents: 836fe97
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 17 17:34:09 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 17 17:34:09 2016 +0100

----------------------------------------------------------------------
 .../streaming/runtime/io/BarrierBuffer.java     | 11 ++++++-
 .../streaming/runtime/io/BarrierTracker.java    | 31 ++++++++++++++++----
 2 files changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53988099/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 0ad9905..0940133 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -199,6 +199,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	}
 
 	private void completeBufferedSequence() throws IOException {
+		LOG.debug("Finished feeding back buffered data");
+
 		currentBuffered.cleanup();
 		currentBuffered = queuedBuffered.pollFirst();
 		if (currentBuffered != null) {
@@ -475,7 +477,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		else {
 			// uncommon case: buffered data pending
 			// push back the pending data, if we have any
-			
+			LOG.debug("Checkpoint skipped via buffered data:" +
+					"Pushing back current alignment buffers and feeding back new alignment data first.");
+
 			// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one
 			BufferSpiller.SpilledBufferOrEventSequence bufferedNow = bufferSpiller.rollOverWithNewBuffer();
 			if (bufferedNow != null) {
@@ -486,6 +490,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			}
 		}
 
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Size of buffered data: {} bytes",
+					currentBuffered == null ? 0L : currentBuffered.size());
+		}
+
 		// the next barrier that comes must assume it is the first
 		numBarriersReceived = 0;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53988099/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 72838bc..f497f4b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -27,6 +27,9 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayDeque;
 
 /**
@@ -43,10 +46,14 @@ import java.util.ArrayDeque;
 @Internal
 public class BarrierTracker implements CheckpointBarrierHandler {
 
+	private static final Logger LOG = LoggerFactory.getLogger(BarrierTracker.class);
+
 	/** The tracker tracks a maximum number of checkpoints, for which some, but not all
 	 * barriers have yet arrived. */
 	private static final int MAX_CHECKPOINTS_TO_TRACK = 50;
-	
+
+	// ------------------------------------------------------------------------
+
 	/** The input gate, to draw the buffers and events from */
 	private final InputGate inputGate;
 	
@@ -81,10 +88,10 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 				return next;
 			}
 			else if (next.getEvent().getClass() == CheckpointBarrier.class) {
-				processBarrier((CheckpointBarrier) next.getEvent());
+				processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
 			}
 			else if (next.getEvent().getClass() == CancelCheckpointMarker.class) {
-				processCheckpointAbortBarrier((CancelCheckpointMarker) next.getEvent());
+				processCheckpointAbortBarrier((CancelCheckpointMarker) next.getEvent(), next.getChannelIndex());
 			}
 			else {
 				// some other event
@@ -119,7 +126,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		return 0L;
 	}
 
-	private void processBarrier(CheckpointBarrier receivedBarrier) throws Exception {
+	private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
 		final long barrierId = receivedBarrier.getId();
 
 		// fast path for single channel trackers
@@ -129,8 +136,11 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		}
 
 		// general path for multiple input channels
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex);
+		}
 
-		// find the checkpoint barrier in the queue of bending barriers
+		// find the checkpoint barrier in the queue of pending barriers
 		CheckpointBarrierCount cbc = null;
 		int pos = 0;
 
@@ -155,6 +165,10 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 
 				// notify the listener
 				if (!cbc.isAborted()) {
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Received all barriers for checkpoint {}", barrierId);
+					}
+
 					notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp());
 				}
 			}
@@ -176,9 +190,14 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		}
 	}
 
-	private void processCheckpointAbortBarrier(CancelCheckpointMarker barrier) throws Exception {
+
+	private void processCheckpointAbortBarrier(CancelCheckpointMarker barrier, int channelIndex) throws Exception {
 		final long checkpointId = barrier.getCheckpointId();
 
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Received cancellation barrier for checkpoint {} from channel {}", checkpointId, channelIndex);
+		}
+
 		// fast path for single channel trackers
 		if (totalNumberOfInputChannels == 1) {
 			notifyAbort(checkpointId);