You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/30 12:48:00 UTC

[10/10] flink git commit: [FLINK-4379] [checkpoints] Fix minor bug and improve debug logging

[FLINK-4379] [checkpoints] Fix minor bug and improve debug logging


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f8f5eb3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f8f5eb3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f8f5eb3

Branch: refs/heads/master
Commit: 6f8f5eb3b9ba07cd3bb4d9f7edd43d4b8862acbe
Parents: 53ed6ad
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Sep 29 21:12:38 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 30 12:38:46 2016 +0200

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java      | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6f8f5eb3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1725eca..88c3ba4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -717,6 +717,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 				cancelables.registerClosable(asyncCheckpointRunnable);
 				asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
+
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("{} - finished synchronous part of checkpoint {}." +
+							"Alignment duration: {} ms, snapshot duration {} ms",
+							getName(), checkpointId, alignmentDurationNanos / 1_000_000, syncDurationMillis);
+				}
+
 				return true;
 			} else {
 				return false;
@@ -998,12 +1005,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				final long asyncEndNanos = System.nanoTime();
 				final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000;
 
-				if (nonPartitionedStateHandles.isEmpty() && keyedStates.isEmpty()) {
-					owner.getEnvironment().acknowledgeCheckpoint(checkpointId,
+				if (nonPartitionedStateHandles.isEmpty() && partitioneableStateHandles.isEmpty() && keyedStates.isEmpty()) {
+					owner.getEnvironment().acknowledgeCheckpoint(
+							checkpointId,
 							syncDurationMillies, asyncDurationMillis,
 							bytesBufferedInAlignment, alignmentDurationNanos);
-				} else  {
-
+				} else {
 					CheckpointStateHandles allStateHandles = new CheckpointStateHandles(
 							nonPartitionedStateHandles,
 							partitioneableStateHandles,
@@ -1016,8 +1023,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				}
 
 				if (LOG.isDebugEnabled()) {
-					LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}. Returning handles on " +
-							"keyed states {}.", checkpointId, name, keyedStates);
+					LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", 
+							owner.getName(), checkpointId, asyncDurationMillis);
 				}
 			}
 			catch (Exception e) {