You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/16 12:47:03 UTC

[flink] branch release-1.9 updated: [FLINK-9900][tests] Harden ZooKeeperHighAvailabilityITCase

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new fc7aeff  [FLINK-9900][tests] Harden ZooKeeperHighAvailabilityITCase
fc7aeff is described below

commit fc7aeffd83d390d9a9452e8fcc55aec0c9d23cac
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Fri Aug 16 16:06:32 2019 +0800

    [FLINK-9900][tests] Harden ZooKeeperHighAvailabilityITCase
    
    Waiting checkpoint id increasing to 5 can not guarantee there must be
    state data included. An empty checkpoint would fail the restoration
    checking.
    
    This closes #9461.
---
 .../ZooKeeperHighAvailabilityITCase.java           | 31 +++++++++++++++++++---
 1 file changed, 27 insertions(+), 4 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index 7389613..c34caa1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.StateBackend;
@@ -78,6 +79,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.number.OrderingComparison.greaterThan;
@@ -333,7 +335,7 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
 
 	private static class CheckpointBlockingFunction
 			extends RichMapFunction<String, String>
-			implements CheckpointedFunction {
+			implements CheckpointedFunction, CheckpointListener {
 
 		// verify that we only call initializeState()
 		// once with isRestored() == false. All other invocations must have isRestored() == true. This
@@ -352,6 +354,14 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
 
 		static AtomicBoolean failedAlready = new AtomicBoolean(false);
 
+		static AtomicBoolean stateRecorded = new AtomicBoolean(false);
+
+		// for checkpoint with a id not less than this id, it includes state data
+		static AtomicLong minimalCheckpointIdIncludingData = new AtomicLong(Long.MAX_VALUE);
+
+		// make sure there is at least one completed checkpoint including state data
+		static AtomicBoolean checkpointCompletedIncludingData = new AtomicBoolean(false);
+
 		// also have some state to write to the checkpoint
 		private final ValueStateDescriptor<String> stateDescriptor =
 			new ValueStateDescriptor<>("state", StringSerializer.INSTANCE);
@@ -359,20 +369,26 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
 		@Override
 		public String map(String value) throws Exception {
 			getRuntimeContext().getState(stateDescriptor).update("42");
+			stateRecorded.compareAndSet(false, true);
 			return value;
 		}
 
 		@Override
 		public void snapshotState(FunctionSnapshotContext context) throws Exception {
-			if (context.getCheckpointId() > 5) {
+			if (stateRecorded.get()) {
+				minimalCheckpointIdIncludingData.compareAndSet(Long.MAX_VALUE,
+					context.getCheckpointId());
+			}
+			if (checkpointCompletedIncludingData.get()) {
+				// there is a checkpoint completed with state data, we can trigger the failure now
 				waitForCheckpointLatch.trigger();
 				failInCheckpointLatch.await();
 				if (!failedAlready.getAndSet(true)) {
 					throw new RuntimeException("Failing on purpose.");
 				} else {
 					// make sure there would be no more successful checkpoint before job failing
-					// otherwise there might be a successful checkpoint 7 which is unexpected
-					// we expect checkpoint 5 is the last successful one before ha storage recovering
+					// otherwise there might be an unexpected successful checkpoint even
+					// CheckpointFailureManager has decided to fail the job
 					blockSnapshotLatch.await();
 				}
 			}
@@ -395,6 +411,13 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
 				}
 			}
 		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			if (checkpointId >= minimalCheckpointIdIncludingData.get()) {
+				checkpointCompletedIncludingData.compareAndSet(false, true);
+			}
+		}
 	}
 
 	/**