You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/16 12:47:03 UTC
[flink] branch release-1.9 updated: [FLINK-9900][tests] Harden
ZooKeeperHighAvailabilityITCase
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new fc7aeff [FLINK-9900][tests] Harden ZooKeeperHighAvailabilityITCase
fc7aeff is described below
commit fc7aeffd83d390d9a9452e8fcc55aec0c9d23cac
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Fri Aug 16 16:06:32 2019 +0800
[FLINK-9900][tests] Harden ZooKeeperHighAvailabilityITCase
Waiting checkpoint id increasing to 5 can not guarantee there must be
state data included. An empty checkpoint would fail the restoration
checking.
This closes #9461.
---
.../ZooKeeperHighAvailabilityITCase.java | 31 +++++++++++++++++++---
1 file changed, 27 insertions(+), 4 deletions(-)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index 7389613..c34caa1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
@@ -78,6 +79,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.number.OrderingComparison.greaterThan;
@@ -333,7 +335,7 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
private static class CheckpointBlockingFunction
extends RichMapFunction<String, String>
- implements CheckpointedFunction {
+ implements CheckpointedFunction, CheckpointListener {
// verify that we only call initializeState()
// once with isRestored() == false. All other invocations must have isRestored() == true. This
@@ -352,6 +354,14 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
static AtomicBoolean failedAlready = new AtomicBoolean(false);
+ static AtomicBoolean stateRecorded = new AtomicBoolean(false);
+
+ // for checkpoint with a id not less than this id, it includes state data
+ static AtomicLong minimalCheckpointIdIncludingData = new AtomicLong(Long.MAX_VALUE);
+
+ // make sure there is at least one completed checkpoint including state data
+ static AtomicBoolean checkpointCompletedIncludingData = new AtomicBoolean(false);
+
// also have some state to write to the checkpoint
private final ValueStateDescriptor<String> stateDescriptor =
new ValueStateDescriptor<>("state", StringSerializer.INSTANCE);
@@ -359,20 +369,26 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
@Override
public String map(String value) throws Exception {
getRuntimeContext().getState(stateDescriptor).update("42");
+ stateRecorded.compareAndSet(false, true);
return value;
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
- if (context.getCheckpointId() > 5) {
+ if (stateRecorded.get()) {
+ minimalCheckpointIdIncludingData.compareAndSet(Long.MAX_VALUE,
+ context.getCheckpointId());
+ }
+ if (checkpointCompletedIncludingData.get()) {
+ // there is a checkpoint completed with state data, we can trigger the failure now
waitForCheckpointLatch.trigger();
failInCheckpointLatch.await();
if (!failedAlready.getAndSet(true)) {
throw new RuntimeException("Failing on purpose.");
} else {
// make sure there would be no more successful checkpoint before job failing
- // otherwise there might be a successful checkpoint 7 which is unexpected
- // we expect checkpoint 5 is the last successful one before ha storage recovering
+ // otherwise there might be an unexpected successful checkpoint even
+ // CheckpointFailureManager has decided to fail the job
blockSnapshotLatch.await();
}
}
@@ -395,6 +411,13 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
}
}
}
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ if (checkpointId >= minimalCheckpointIdIncludingData.get()) {
+ checkpointCompletedIncludingData.compareAndSet(false, true);
+ }
+ }
}
/**