You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/30 12:48:00 UTC
[10/10] flink git commit: [FLINK-4379] [checkpoints] Fix minor bug
and improve debug logging
[FLINK-4379] [checkpoints] Fix minor bug and improve debug logging
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f8f5eb3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f8f5eb3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f8f5eb3
Branch: refs/heads/master
Commit: 6f8f5eb3b9ba07cd3bb4d9f7edd43d4b8862acbe
Parents: 53ed6ad
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Sep 29 21:12:38 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 30 12:38:46 2016 +0200
----------------------------------------------------------------------
.../streaming/runtime/tasks/StreamTask.java | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6f8f5eb3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1725eca..88c3ba4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -717,6 +717,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
cancelables.registerClosable(asyncCheckpointRunnable);
asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} - finished synchronous part of checkpoint {}." +
+ "Alignment duration: {} ms, snapshot duration {} ms",
+ getName(), checkpointId, alignmentDurationNanos / 1_000_000, syncDurationMillis);
+ }
+
return true;
} else {
return false;
@@ -998,12 +1005,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
final long asyncEndNanos = System.nanoTime();
final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000;
- if (nonPartitionedStateHandles.isEmpty() && keyedStates.isEmpty()) {
- owner.getEnvironment().acknowledgeCheckpoint(checkpointId,
+ if (nonPartitionedStateHandles.isEmpty() && partitioneableStateHandles.isEmpty() && keyedStates.isEmpty()) {
+ owner.getEnvironment().acknowledgeCheckpoint(
+ checkpointId,
syncDurationMillies, asyncDurationMillis,
bytesBufferedInAlignment, alignmentDurationNanos);
- } else {
-
+ } else {
CheckpointStateHandles allStateHandles = new CheckpointStateHandles(
nonPartitionedStateHandles,
partitioneableStateHandles,
@@ -1016,8 +1023,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}. Returning handles on " +
- "keyed states {}.", checkpointId, name, keyedStates);
+ LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
+ owner.getName(), checkpointId, asyncDurationMillis);
}
}
catch (Exception e) {