You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/12/07 10:44:50 UTC

[flink] branch release-1.5 updated: [FLINK-10482] Fix double counting of checkpoint stat

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
     new ddbef1b  [FLINK-10482] Fix double counting of checkpoint stat
ddbef1b is described below

commit ddbef1be626a60d2d9a6c7c162c806a5d2953818
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Thu Nov 15 18:51:43 2018 +0100

    [FLINK-10482] Fix double counting of checkpoint stat
---
 .../runtime/checkpoint/CheckpointStatsCounts.java  | 24 ++++++++++++++-----
 .../apache/flink/runtime/jobmaster/JobMaster.java  | 17 +++++++-------
 .../checkpoint/CheckpointStatsCountsTest.java      | 27 ++++++++++------------
 3 files changed, 38 insertions(+), 30 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
index dad45eb..9e15aeb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.Serializable;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -26,6 +29,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * Counts of checkpoints.
  */
 public class CheckpointStatsCounts implements Serializable {
+	private static final Logger LOG = LoggerFactory.getLogger(CheckpointStatsCounts.class);
 
 	private static final long serialVersionUID = -5229425063269482528L;
 
@@ -147,9 +151,8 @@ public class CheckpointStatsCounts implements Serializable {
 	 * {@link #incrementInProgressCheckpoints()}.
 	 */
 	void incrementCompletedCheckpoints() {
-		if (--numInProgressCheckpoints < 0) {
-			throw new IllegalStateException("Incremented the completed number of checkpoints " +
-				"without incrementing the in progress checkpoints before.");
+		if (canDecrementOfInProgressCheckpointsNumber()) {
+			numInProgressCheckpoints--;
 		}
 		numCompletedCheckpoints++;
 	}
@@ -161,9 +164,8 @@ public class CheckpointStatsCounts implements Serializable {
 	 * {@link #incrementInProgressCheckpoints()}.
 	 */
 	void incrementFailedCheckpoints() {
-		if (--numInProgressCheckpoints < 0) {
-			throw new IllegalStateException("Incremented the completed number of checkpoints " +
-				"without incrementing the in progress checkpoints before.");
+		if (canDecrementOfInProgressCheckpointsNumber()) {
+			numInProgressCheckpoints--;
 		}
 		numFailedCheckpoints++;
 	}
@@ -181,4 +183,14 @@ public class CheckpointStatsCounts implements Serializable {
 			numCompletedCheckpoints,
 			numFailedCheckpoints);
 	}
+
+	private boolean canDecrementOfInProgressCheckpointsNumber() {
+		boolean decrementLeadsToNegativeNumber = numInProgressCheckpoints - 1 < 0;
+		if (decrementLeadsToNegativeNumber) {
+			String errorMessage = "Incremented the completed number of checkpoints " +
+				"without incrementing the in progress checkpoints before.";
+			LOG.warn(errorMessage);
+		}
+		return !decrementLeadsToNegativeNumber;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 0b0b82b..0b9aac7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -942,19 +942,18 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		return checkpointCoordinator
 			.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
 			.thenApply(CompletedCheckpoint::getExternalPointer)
-			.thenApplyAsync(path -> {
-				if (cancelJob) {
+			.handleAsync((path, throwable) -> {
+				if (throwable != null) {
+					if (cancelJob) {
+						startCheckpointScheduler(checkpointCoordinator);
+					}
+					throw new CompletionException(throwable);
+				} else if (cancelJob) {
 					log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
 					cancel(timeout);
 				}
 				return path;
-			}, getMainThreadExecutor())
-			.exceptionally(throwable -> {
-				if (cancelJob) {
-					startCheckpointScheduler(checkpointCoordinator);
-				}
-				throw new CompletionException(throwable);
-			});
+			}, getMainThreadExecutor());
 	}
 
 	private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
index cf1e7f7..2d09b46 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
@@ -21,15 +21,16 @@ package org.apache.flink.runtime.checkpoint;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
 
+/** Test checkpoint statistics counters. */
 public class CheckpointStatsCountsTest {
 
 	/**
 	 * Tests that counts are reported correctly.
 	 */
 	@Test
-	public void testCounts() throws Exception {
+	public void testCounts() {
 		CheckpointStatsCounts counts = new CheckpointStatsCounts();
 		assertEquals(0, counts.getNumberOfRestoredCheckpoints());
 		assertEquals(0, counts.getTotalNumberOfCheckpoints());
@@ -80,19 +81,15 @@ public class CheckpointStatsCountsTest {
 	 * incrementing the in progress checkpoints before throws an Exception.
 	 */
 	@Test
-	public void testCompleteOrFailWithoutInProgressCheckpoint() throws Exception {
+	public void testCompleteOrFailWithoutInProgressCheckpoint() {
 		CheckpointStatsCounts counts = new CheckpointStatsCounts();
-		try {
-			counts.incrementCompletedCheckpoints();
-			fail("Did not throw expected Exception");
-		} catch (IllegalStateException ignored) {
-		}
-
-		try {
-			counts.incrementFailedCheckpoints();
-			fail("Did not throw expected Exception");
-		} catch (IllegalStateException ignored) {
-		}
+		counts.incrementCompletedCheckpoints();
+		assertTrue("Number of checkpoints in progress should never be negative",
+			counts.getNumberOfInProgressCheckpoints() >= 0);
+
+		counts.incrementFailedCheckpoints();
+		assertTrue("Number of checkpoints in progress should never be negative",
+			counts.getNumberOfInProgressCheckpoints() >= 0);
 	}
 
 	/**
@@ -100,7 +97,7 @@ public class CheckpointStatsCountsTest {
 	 * parent.
 	 */
 	@Test
-	public void testCreateSnapshot() throws Exception {
+	public void testCreateSnapshot() {
 		CheckpointStatsCounts counts = new CheckpointStatsCounts();
 		counts.incrementRestoredCheckpoints();
 		counts.incrementRestoredCheckpoints();