You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/12/07 10:44:50 UTC
[flink] branch release-1.5 updated: [FLINK-10482] Fix double
counting of checkpoint stat
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.5 by this push:
new ddbef1b [FLINK-10482] Fix double counting of checkpoint stat
ddbef1b is described below
commit ddbef1be626a60d2d9a6c7c162c806a5d2953818
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Thu Nov 15 18:51:43 2018 +0100
[FLINK-10482] Fix double counting of checkpoint stat
---
.../runtime/checkpoint/CheckpointStatsCounts.java | 24 ++++++++++++++-----
.../apache/flink/runtime/jobmaster/JobMaster.java | 17 +++++++-------
.../checkpoint/CheckpointStatsCountsTest.java | 27 ++++++++++------------
3 files changed, 38 insertions(+), 30 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
index dad45eb..9e15aeb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
@@ -18,6 +18,9 @@
package org.apache.flink.runtime.checkpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.Serializable;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -26,6 +29,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
* Counts of checkpoints.
*/
public class CheckpointStatsCounts implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(CheckpointStatsCounts.class);
private static final long serialVersionUID = -5229425063269482528L;
@@ -147,9 +151,8 @@ public class CheckpointStatsCounts implements Serializable {
* {@link #incrementInProgressCheckpoints()}.
*/
void incrementCompletedCheckpoints() {
- if (--numInProgressCheckpoints < 0) {
- throw new IllegalStateException("Incremented the completed number of checkpoints " +
- "without incrementing the in progress checkpoints before.");
+ if (canDecrementOfInProgressCheckpointsNumber()) {
+ numInProgressCheckpoints--;
}
numCompletedCheckpoints++;
}
@@ -161,9 +164,8 @@ public class CheckpointStatsCounts implements Serializable {
* {@link #incrementInProgressCheckpoints()}.
*/
void incrementFailedCheckpoints() {
- if (--numInProgressCheckpoints < 0) {
- throw new IllegalStateException("Incremented the completed number of checkpoints " +
- "without incrementing the in progress checkpoints before.");
+ if (canDecrementOfInProgressCheckpointsNumber()) {
+ numInProgressCheckpoints--;
}
numFailedCheckpoints++;
}
@@ -181,4 +183,14 @@ public class CheckpointStatsCounts implements Serializable {
numCompletedCheckpoints,
numFailedCheckpoints);
}
+
+ private boolean canDecrementOfInProgressCheckpointsNumber() {
+ boolean decrementLeadsToNegativeNumber = numInProgressCheckpoints - 1 < 0;
+ if (decrementLeadsToNegativeNumber) {
+ String errorMessage = "Incremented the completed number of checkpoints " +
+ "without incrementing the in progress checkpoints before.";
+ LOG.warn(errorMessage);
+ }
+ return !decrementLeadsToNegativeNumber;
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 0b0b82b..0b9aac7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -942,19 +942,18 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
return checkpointCoordinator
.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer)
- .thenApplyAsync(path -> {
- if (cancelJob) {
+ .handleAsync((path, throwable) -> {
+ if (throwable != null) {
+ if (cancelJob) {
+ startCheckpointScheduler(checkpointCoordinator);
+ }
+ throw new CompletionException(throwable);
+ } else if (cancelJob) {
log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
cancel(timeout);
}
return path;
- }, getMainThreadExecutor())
- .exceptionally(throwable -> {
- if (cancelJob) {
- startCheckpointScheduler(checkpointCoordinator);
- }
- throw new CompletionException(throwable);
- });
+ }, getMainThreadExecutor());
}
private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
index cf1e7f7..2d09b46 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
@@ -21,15 +21,16 @@ package org.apache.flink.runtime.checkpoint;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
+/** Test checkpoint statistics counters. */
public class CheckpointStatsCountsTest {
/**
* Tests that counts are reported correctly.
*/
@Test
- public void testCounts() throws Exception {
+ public void testCounts() {
CheckpointStatsCounts counts = new CheckpointStatsCounts();
assertEquals(0, counts.getNumberOfRestoredCheckpoints());
assertEquals(0, counts.getTotalNumberOfCheckpoints());
@@ -80,19 +81,15 @@ public class CheckpointStatsCountsTest {
* incrementing the in progress checkpoints before throws an Exception.
*/
@Test
- public void testCompleteOrFailWithoutInProgressCheckpoint() throws Exception {
+ public void testCompleteOrFailWithoutInProgressCheckpoint() {
CheckpointStatsCounts counts = new CheckpointStatsCounts();
- try {
- counts.incrementCompletedCheckpoints();
- fail("Did not throw expected Exception");
- } catch (IllegalStateException ignored) {
- }
-
- try {
- counts.incrementFailedCheckpoints();
- fail("Did not throw expected Exception");
- } catch (IllegalStateException ignored) {
- }
+ counts.incrementCompletedCheckpoints();
+ assertTrue("Number of checkpoints in progress should never be negative",
+ counts.getNumberOfInProgressCheckpoints() >= 0);
+
+ counts.incrementFailedCheckpoints();
+ assertTrue("Number of checkpoints in progress should never be negative",
+ counts.getNumberOfInProgressCheckpoints() >= 0);
}
/**
@@ -100,7 +97,7 @@ public class CheckpointStatsCountsTest {
* parent.
*/
@Test
- public void testCreateSnapshot() throws Exception {
+ public void testCreateSnapshot() {
CheckpointStatsCounts counts = new CheckpointStatsCounts();
counts.incrementRestoredCheckpoints();
counts.incrementRestoredCheckpoints();