You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/12/07 10:46:07 UTC

[GitHub] tillrohrmann closed pull request #7118: [FLINK-10482] Fix double counting of checkpoint stat

tillrohrmann closed pull request #7118: [FLINK-10482] Fix double counting of checkpoint stat
URL: https://github.com/apache/flink/pull/7118
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
index dad45eb669c..9e15aebd048 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.Serializable;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -26,6 +29,7 @@
  * Counts of checkpoints.
  */
 public class CheckpointStatsCounts implements Serializable {
+	private static final Logger LOG = LoggerFactory.getLogger(CheckpointStatsCounts.class);
 
 	private static final long serialVersionUID = -5229425063269482528L;
 
@@ -147,9 +151,8 @@ void incrementInProgressCheckpoints() {
 	 * {@link #incrementInProgressCheckpoints()}.
 	 */
 	void incrementCompletedCheckpoints() {
-		if (--numInProgressCheckpoints < 0) {
-			throw new IllegalStateException("Incremented the completed number of checkpoints " +
-				"without incrementing the in progress checkpoints before.");
+		if (canDecrementOfInProgressCheckpointsNumber()) {
+			numInProgressCheckpoints--;
 		}
 		numCompletedCheckpoints++;
 	}
@@ -161,9 +164,8 @@ void incrementCompletedCheckpoints() {
 	 * {@link #incrementInProgressCheckpoints()}.
 	 */
 	void incrementFailedCheckpoints() {
-		if (--numInProgressCheckpoints < 0) {
-			throw new IllegalStateException("Incremented the completed number of checkpoints " +
-				"without incrementing the in progress checkpoints before.");
+		if (canDecrementOfInProgressCheckpointsNumber()) {
+			numInProgressCheckpoints--;
 		}
 		numFailedCheckpoints++;
 	}
@@ -181,4 +183,14 @@ CheckpointStatsCounts createSnapshot() {
 			numCompletedCheckpoints,
 			numFailedCheckpoints);
 	}
+
+	private boolean canDecrementOfInProgressCheckpointsNumber() {
+		boolean decrementLeadsToNegativeNumber = numInProgressCheckpoints - 1 < 0;
+		if (decrementLeadsToNegativeNumber) {
+			String errorMessage = "Incremented the completed number of checkpoints " +
+				"without incrementing the in progress checkpoints before.";
+			LOG.warn(errorMessage);
+		}
+		return !decrementLeadsToNegativeNumber;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 5d2d363cf71..54c14af8983 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -962,19 +962,18 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) {
 		return checkpointCoordinator
 			.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
 			.thenApply(CompletedCheckpoint::getExternalPointer)
-			.thenApplyAsync(path -> {
-				if (cancelJob) {
+			.handleAsync((path, throwable) -> {
+				if (throwable != null) {
+					if (cancelJob) {
+						startCheckpointScheduler(checkpointCoordinator);
+					}
+					throw new CompletionException(throwable);
+				} else if (cancelJob) {
 					log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
 					cancel(timeout);
 				}
 				return path;
-			}, getMainThreadExecutor())
-			.exceptionally(throwable -> {
-				if (cancelJob) {
-					startCheckpointScheduler(checkpointCoordinator);
-				}
-				throw new CompletionException(throwable);
-			});
+			}, getMainThreadExecutor());
 	}
 
 	private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
index cf1e7f7f82d..2d09b46464f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
@@ -21,15 +21,16 @@
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
 
+/** Test checkpoint statistics counters. */
 public class CheckpointStatsCountsTest {
 
 	/**
 	 * Tests that counts are reported correctly.
 	 */
 	@Test
-	public void testCounts() throws Exception {
+	public void testCounts() {
 		CheckpointStatsCounts counts = new CheckpointStatsCounts();
 		assertEquals(0, counts.getNumberOfRestoredCheckpoints());
 		assertEquals(0, counts.getTotalNumberOfCheckpoints());
@@ -80,19 +81,15 @@ public void testCounts() throws Exception {
 	 * incrementing the in progress checkpoints before throws an Exception.
 	 */
 	@Test
-	public void testCompleteOrFailWithoutInProgressCheckpoint() throws Exception {
+	public void testCompleteOrFailWithoutInProgressCheckpoint() {
 		CheckpointStatsCounts counts = new CheckpointStatsCounts();
-		try {
-			counts.incrementCompletedCheckpoints();
-			fail("Did not throw expected Exception");
-		} catch (IllegalStateException ignored) {
-		}
-
-		try {
-			counts.incrementFailedCheckpoints();
-			fail("Did not throw expected Exception");
-		} catch (IllegalStateException ignored) {
-		}
+		counts.incrementCompletedCheckpoints();
+		assertTrue("Number of checkpoints in progress should never be negative",
+			counts.getNumberOfInProgressCheckpoints() >= 0);
+
+		counts.incrementFailedCheckpoints();
+		assertTrue("Number of checkpoints in progress should never be negative",
+			counts.getNumberOfInProgressCheckpoints() >= 0);
 	}
 
 	/**
@@ -100,7 +97,7 @@ public void testCompleteOrFailWithoutInProgressCheckpoint() throws Exception {
 	 * parent.
 	 */
 	@Test
-	public void testCreateSnapshot() throws Exception {
+	public void testCreateSnapshot() {
 		CheckpointStatsCounts counts = new CheckpointStatsCounts();
 		counts.incrementRestoredCheckpoints();
 		counts.incrementRestoredCheckpoints();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services