You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/30 08:46:49 UTC

[flink] branch release-1.6 updated: [FLINK-9900][tests] Harden ZooKeeperHighAvailabilityITCase

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new 53e7472  [FLINK-9900][tests] Harden ZooKeeperHighAvailabilityITCase
53e7472 is described below

commit 53e747272a0b8841e763f56d70e223d7de03d7bb
Author: Chesnay <ch...@apache.org>
AuthorDate: Mon Jul 30 10:45:51 2018 +0200

    [FLINK-9900][tests] Harden ZooKeeperHighAvailabilityITCase
---
 .../ZooKeeperHighAvailabilityITCase.java           | 107 +++++++++++++++------
 1 file changed, 75 insertions(+), 32 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index e02ed01..b83f89e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -31,7 +31,12 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -56,6 +61,12 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
 import java.time.Duration;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -63,6 +74,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.core.Is.is;
+import static org.hamcrest.number.OrderingComparison.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
@@ -107,6 +119,10 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
 		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
 		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 
+		config.setString(
+			ConfigConstants.METRICS_REPORTER_PREFIX + "restarts." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
+			RestartReporter.class.getName());
+
 		// we have to manage this manually because we have to create the ZooKeeper server
 		// ahead of this
 		miniClusterResource = new MiniClusterResource(
@@ -184,58 +200,59 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
 		// wait until we did some checkpoints
 		waitForCheckpointLatch.await();
 
+		log.debug("Messing with HA directory");
 		// mess with the HA directory so that the job cannot restore
 		File movedCheckpointLocation = TEMPORARY_FOLDER.newFolder();
-		int numCheckpoints = 0;
-		File[] files = haStorageDir.listFiles();
-		assertNotNull(files);
-		for (File file : files) {
-			if (file.getName().startsWith("completedCheckpoint")) {
-				assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName())));
-				numCheckpoints++;
+		AtomicInteger numCheckpoints = new AtomicInteger();
+		Files.walkFileTree(haStorageDir.toPath(), new SimpleFileVisitor<Path>() {
+			@Override
+			public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
+				if (file.getFileName().toString().startsWith("completedCheckpoint")) {
+					log.debug("Moving original checkpoint file {}.", file);
+					try {
+						Files.move(file, movedCheckpointLocation.toPath().resolve(file.getFileName()));
+						numCheckpoints.incrementAndGet();
+					} catch (IOException ioe) {
+						// previous checkpoint files may be deleted asynchronously
+						log.debug("Exception while moving HA files.", ioe);
+					}
+				}
+				return FileVisitResult.CONTINUE;
 			}
-		}
+		});
+
 		// Note to future developers: This will break when we change Flink to not put the
 		// checkpoint metadata into the HA directory but instead rely on the fact that the
 		// actual checkpoint directory on DFS contains the checkpoint metadata. In this case,
 		// ZooKeeper will only contain a "handle" (read: String) that points to the metadata
 		// in DFS. The likely solution will be that we have to go directly to ZooKeeper, find
 		// out where the checkpoint is stored and mess with that.
-		assertTrue(numCheckpoints > 0);
+		assertTrue(numCheckpoints.get() > 0);
 
+		log.debug("Resuming job");
 		failInCheckpointLatch.trigger();
 
-		// Ensure that we see at least one cycle where the job tries to restart and fails.
-		CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
-			() -> clusterClient.getJobStatus(jobID),
-			Time.milliseconds(1),
-			deadline,
-			(jobStatus) -> jobStatus == JobStatus.RESTARTING,
-			TestingUtils.defaultScheduledExecutor());
-		assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
-
-		jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
-			() -> clusterClient.getJobStatus(jobID),
-			Time.milliseconds(1),
-			deadline,
-			(jobStatus) -> jobStatus == JobStatus.FAILING,
-			TestingUtils.defaultScheduledExecutor());
-		assertEquals(JobStatus.FAILING, jobStatusFuture.get());
+		assertNotNull("fullRestarts metric could not be accessed.", RestartReporter.numRestarts);
+		while (RestartReporter.numRestarts.getValue() < 5 && deadline.hasTimeLeft()) {
+			Thread.sleep(50);
+		}
+		assertThat(RestartReporter.numRestarts.getValue(), is(greaterThan(4L)));
 
 		// move back the HA directory so that the job can restore
 		CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
+		log.debug("Restored zookeeper");
 
-		files = movedCheckpointLocation.listFiles();
-		assertNotNull(files);
-		for (File file : files) {
-			if (file.getName().startsWith("completedCheckpoint")) {
-				assertTrue(file.renameTo(new File(haStorageDir, file.getName())));
+		Files.walkFileTree(movedCheckpointLocation.toPath(), new SimpleFileVisitor<Path>() {
+			@Override
+			public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+				Files.move(file, haStorageDir.toPath().resolve(file.getFileName()));
+				return FileVisitResult.CONTINUE;
 			}
-		}
+		});
 
 		// now the job should be able to go to RUNNING again and then eventually to FINISHED,
 		// which it only does if it could successfully restore
-		jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+		CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
 			() -> clusterClient.getJobStatus(jobID),
 			Time.milliseconds(50),
 			deadline,
@@ -324,4 +341,30 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
 			}
 		}
 	}
+
+	/**
+	 * Reporter that exposes the {@link NumberOfFullRestartsGauge} metric.
+	 */
+	public static class RestartReporter implements MetricReporter {
+		static volatile NumberOfFullRestartsGauge numRestarts = null;
+
+		@Override
+		public void open(MetricConfig metricConfig) {
+		}
+
+		@Override
+		public void close() {
+		}
+
+		@Override
+		public void notifyOfAddedMetric(Metric metric, String s, MetricGroup metricGroup) {
+			if (metric instanceof NumberOfFullRestartsGauge) {
+				numRestarts = (NumberOfFullRestartsGauge) metric;
+			}
+		}
+
+		@Override
+		public void notifyOfRemovedMetric(Metric metric, String s, MetricGroup metricGroup) {
+		}
+	}
 }