You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/30 08:46:49 UTC
[flink] branch release-1.6 updated: [FLINK-9900][tests] Harden
ZooKeeperHighAvailabilityITCase
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new 53e7472 [FLINK-9900][tests] Harden ZooKeeperHighAvailabilityITCase
53e7472 is described below
commit 53e747272a0b8841e763f56d70e223d7de03d7bb
Author: Chesnay <ch...@apache.org>
AuthorDate: Mon Jul 30 10:45:51 2018 +0200
[FLINK-9900][tests] Harden ZooKeeperHighAvailabilityITCase
---
.../ZooKeeperHighAvailabilityITCase.java | 107 +++++++++++++++------
1 file changed, 75 insertions(+), 32 deletions(-)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index e02ed01..b83f89e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -31,7 +31,12 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -56,6 +61,12 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -63,6 +74,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.core.Is.is;
+import static org.hamcrest.number.OrderingComparison.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@@ -107,6 +119,10 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+ config.setString(
+ ConfigConstants.METRICS_REPORTER_PREFIX + "restarts." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
+ RestartReporter.class.getName());
+
// we have to manage this manually because we have to create the ZooKeeper server
// ahead of this
miniClusterResource = new MiniClusterResource(
@@ -184,58 +200,59 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
// wait until we did some checkpoints
waitForCheckpointLatch.await();
+ log.debug("Messing with HA directory");
// mess with the HA directory so that the job cannot restore
File movedCheckpointLocation = TEMPORARY_FOLDER.newFolder();
- int numCheckpoints = 0;
- File[] files = haStorageDir.listFiles();
- assertNotNull(files);
- for (File file : files) {
- if (file.getName().startsWith("completedCheckpoint")) {
- assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName())));
- numCheckpoints++;
+ AtomicInteger numCheckpoints = new AtomicInteger();
+ Files.walkFileTree(haStorageDir.toPath(), new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
+ if (file.getFileName().toString().startsWith("completedCheckpoint")) {
+ log.debug("Moving original checkpoint file {}.", file);
+ try {
+ Files.move(file, movedCheckpointLocation.toPath().resolve(file.getFileName()));
+ numCheckpoints.incrementAndGet();
+ } catch (IOException ioe) {
+ // previous checkpoint files may be deleted asynchronously
+ log.debug("Exception while moving HA files.", ioe);
+ }
+ }
+ return FileVisitResult.CONTINUE;
}
- }
+ });
+
// Note to future developers: This will break when we change Flink to not put the
// checkpoint metadata into the HA directory but instead rely on the fact that the
// actual checkpoint directory on DFS contains the checkpoint metadata. In this case,
// ZooKeeper will only contain a "handle" (read: String) that points to the metadata
// in DFS. The likely solution will be that we have to go directly to ZooKeeper, find
// out where the checkpoint is stored and mess with that.
- assertTrue(numCheckpoints > 0);
+ assertTrue(numCheckpoints.get() > 0);
+ log.debug("Resuming job");
failInCheckpointLatch.trigger();
- // Ensure that we see at least one cycle where the job tries to restart and fails.
- CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
- () -> clusterClient.getJobStatus(jobID),
- Time.milliseconds(1),
- deadline,
- (jobStatus) -> jobStatus == JobStatus.RESTARTING,
- TestingUtils.defaultScheduledExecutor());
- assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
-
- jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
- () -> clusterClient.getJobStatus(jobID),
- Time.milliseconds(1),
- deadline,
- (jobStatus) -> jobStatus == JobStatus.FAILING,
- TestingUtils.defaultScheduledExecutor());
- assertEquals(JobStatus.FAILING, jobStatusFuture.get());
+ assertNotNull("fullRestarts metric could not be accessed.", RestartReporter.numRestarts);
+ while (RestartReporter.numRestarts.getValue() < 5 && deadline.hasTimeLeft()) {
+ Thread.sleep(50);
+ }
+ assertThat(RestartReporter.numRestarts.getValue(), is(greaterThan(4L)));
// move back the HA directory so that the job can restore
CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
+ log.debug("Restored zookeeper");
- files = movedCheckpointLocation.listFiles();
- assertNotNull(files);
- for (File file : files) {
- if (file.getName().startsWith("completedCheckpoint")) {
- assertTrue(file.renameTo(new File(haStorageDir, file.getName())));
+ Files.walkFileTree(movedCheckpointLocation.toPath(), new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ Files.move(file, haStorageDir.toPath().resolve(file.getFileName()));
+ return FileVisitResult.CONTINUE;
}
- }
+ });
// now the job should be able to go to RUNNING again and then eventually to FINISHED,
// which it only does if it could successfully restore
- jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+ CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
() -> clusterClient.getJobStatus(jobID),
Time.milliseconds(50),
deadline,
@@ -324,4 +341,30 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
}
}
}
+
+ /**
+ * Reporter that exposes the {@link NumberOfFullRestartsGauge} metric.
+ */
+ public static class RestartReporter implements MetricReporter {
+ static volatile NumberOfFullRestartsGauge numRestarts = null;
+
+ @Override
+ public void open(MetricConfig metricConfig) {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void notifyOfAddedMetric(Metric metric, String s, MetricGroup metricGroup) {
+ if (metric instanceof NumberOfFullRestartsGauge) {
+ numRestarts = (NumberOfFullRestartsGauge) metric;
+ }
+ }
+
+ @Override
+ public void notifyOfRemovedMetric(Metric metric, String s, MetricGroup metricGroup) {
+ }
+ }
}