You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/07/30 08:45:52 UTC

[GitHub] zentol closed pull request #6395: [FLINK-9900][tests] Harden ZooKeeperHighAvailabilityITCase

zentol closed pull request #6395: [FLINK-9900][tests] Harden ZooKeeperHighAvailabilityITCase
URL: https://github.com/apache/flink/pull/6395
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index e02ed010242..b83f89e4ca5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -31,7 +31,12 @@
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -56,6 +61,12 @@
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
 import java.time.Duration;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -63,6 +74,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.core.Is.is;
+import static org.hamcrest.number.OrderingComparison.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
@@ -107,6 +119,10 @@ public static void setup() throws Exception {
 		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
 		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 
+		config.setString(
+			ConfigConstants.METRICS_REPORTER_PREFIX + "restarts." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
+			RestartReporter.class.getName());
+
 		// we have to manage this manually because we have to create the ZooKeeper server
 		// ahead of this
 		miniClusterResource = new MiniClusterResource(
@@ -184,58 +200,59 @@ public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
 		// wait until we did some checkpoints
 		waitForCheckpointLatch.await();
 
+		log.debug("Messing with HA directory");
 		// mess with the HA directory so that the job cannot restore
 		File movedCheckpointLocation = TEMPORARY_FOLDER.newFolder();
-		int numCheckpoints = 0;
-		File[] files = haStorageDir.listFiles();
-		assertNotNull(files);
-		for (File file : files) {
-			if (file.getName().startsWith("completedCheckpoint")) {
-				assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName())));
-				numCheckpoints++;
+		AtomicInteger numCheckpoints = new AtomicInteger();
+		Files.walkFileTree(haStorageDir.toPath(), new SimpleFileVisitor<Path>() {
+			@Override
+			public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
+				if (file.getFileName().toString().startsWith("completedCheckpoint")) {
+					log.debug("Moving original checkpoint file {}.", file);
+					try {
+						Files.move(file, movedCheckpointLocation.toPath().resolve(file.getFileName()));
+						numCheckpoints.incrementAndGet();
+					} catch (IOException ioe) {
+						// previous checkpoint files may be deleted asynchronously
+						log.debug("Exception while moving HA files.", ioe);
+					}
+				}
+				return FileVisitResult.CONTINUE;
 			}
-		}
+		});
+
 		// Note to future developers: This will break when we change Flink to not put the
 		// checkpoint metadata into the HA directory but instead rely on the fact that the
 		// actual checkpoint directory on DFS contains the checkpoint metadata. In this case,
 		// ZooKeeper will only contain a "handle" (read: String) that points to the metadata
 		// in DFS. The likely solution will be that we have to go directly to ZooKeeper, find
 		// out where the checkpoint is stored and mess with that.
-		assertTrue(numCheckpoints > 0);
+		assertTrue(numCheckpoints.get() > 0);
 
+		log.debug("Resuming job");
 		failInCheckpointLatch.trigger();
 
-		// Ensure that we see at least one cycle where the job tries to restart and fails.
-		CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
-			() -> clusterClient.getJobStatus(jobID),
-			Time.milliseconds(1),
-			deadline,
-			(jobStatus) -> jobStatus == JobStatus.RESTARTING,
-			TestingUtils.defaultScheduledExecutor());
-		assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
-
-		jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
-			() -> clusterClient.getJobStatus(jobID),
-			Time.milliseconds(1),
-			deadline,
-			(jobStatus) -> jobStatus == JobStatus.FAILING,
-			TestingUtils.defaultScheduledExecutor());
-		assertEquals(JobStatus.FAILING, jobStatusFuture.get());
+		assertNotNull("fullRestarts metric could not be accessed.", RestartReporter.numRestarts);
+		while (RestartReporter.numRestarts.getValue() < 5 && deadline.hasTimeLeft()) {
+			Thread.sleep(50);
+		}
+		assertThat(RestartReporter.numRestarts.getValue(), is(greaterThan(4L)));
 
 		// move back the HA directory so that the job can restore
 		CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
+		log.debug("Restored zookeeper");
 
-		files = movedCheckpointLocation.listFiles();
-		assertNotNull(files);
-		for (File file : files) {
-			if (file.getName().startsWith("completedCheckpoint")) {
-				assertTrue(file.renameTo(new File(haStorageDir, file.getName())));
+		Files.walkFileTree(movedCheckpointLocation.toPath(), new SimpleFileVisitor<Path>() {
+			@Override
+			public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+				Files.move(file, haStorageDir.toPath().resolve(file.getFileName()));
+				return FileVisitResult.CONTINUE;
 			}
-		}
+		});
 
 		// now the job should be able to go to RUNNING again and then eventually to FINISHED,
 		// which it only does if it could successfully restore
-		jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+		CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
 			() -> clusterClient.getJobStatus(jobID),
 			Time.milliseconds(50),
 			deadline,
@@ -324,4 +341,30 @@ public void initializeState(FunctionInitializationContext context) {
 			}
 		}
 	}
+
+	/**
+	 * Reporter that exposes the {@link NumberOfFullRestartsGauge} metric.
+	 */
+	public static class RestartReporter implements MetricReporter {
+		static volatile NumberOfFullRestartsGauge numRestarts = null;
+
+		@Override
+		public void open(MetricConfig metricConfig) {
+		}
+
+		@Override
+		public void close() {
+		}
+
+		@Override
+		public void notifyOfAddedMetric(Metric metric, String s, MetricGroup metricGroup) {
+			if (metric instanceof NumberOfFullRestartsGauge) {
+				numRestarts = (NumberOfFullRestartsGauge) metric;
+			}
+		}
+
+		@Override
+		public void notifyOfRemovedMetric(Metric metric, String s, MetricGroup metricGroup) {
+		}
+	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services