You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/23 13:23:40 UTC
[flink] branch master updated: [FLINK-11351][tests] Port
JobManagerCleanupITCase to new code base
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 960feb4 [FLINK-11351][tests] Port JobManagerCleanupITCase to new code base
960feb4 is described below
commit 960feb4437c439084cfb317dd127645cfd176578
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 17 16:45:06 2019 +0100
[FLINK-11351][tests] Port JobManagerCleanupITCase to new code base
This commit renames JobManagerCleanupITCase into BlobsCleanupITCase and uses
the MiniCluster instead of the TestingCluster to execute the tests.
This closes #7524.
---
.../flink/runtime/minicluster/MiniCluster.java | 7 +
.../runtime/jobmanager/BlobsCleanupITCase.java | 272 ++++++++++++++++++
.../jobmanager/JobManagerCleanupITCase.java | 304 ---------------------
3 files changed, 279 insertions(+), 304 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 80f8c1c..d0b4219 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -202,6 +202,13 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
}
}
+ public ClusterInformation getClusterInformation() {
+ synchronized (lock) {
+ checkState(running, "MiniCluster is not yet running.");
+ return new ClusterInformation("localhost", blobServer.getPort());
+ }
+ }
+
public HighAvailabilityServices getHighAvailabilityServices() {
synchronized (lock) {
checkState(running, "MiniCluster is not yet running.");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
new file mode 100644
index 0000000..4e53b4e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Small test to check that the {@link org.apache.flink.runtime.blob.BlobServer} cleanup is executed
+ * after job termination.
+ */
+public class BlobsCleanupITCase extends TestLogger {
+
+ private static final long RETRY_INTERVAL = 100L;
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ private static MiniClusterResource miniClusterResource;
+
+ private static UnmodifiableConfiguration configuration;
+
+ private static File blobBaseDir;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ blobBaseDir = TEMPORARY_FOLDER.newFolder();
+
+ Configuration cfg = new Configuration();
+ cfg.setString(BlobServerOptions.STORAGE_DIRECTORY, blobBaseDir.getAbsolutePath());
+ cfg.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay");
+ cfg.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+ // BLOBs are deleted from BlobCache between 1s and 2s after last reference
+ // -> the BlobCache may still have the BLOB or not (let's test both cases randomly)
+ cfg.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+ configuration = new UnmodifiableConfiguration(cfg);
+
+ miniClusterResource = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(2)
+ .setNumberTaskManagers(1)
+ .setConfiguration(configuration)
+ .build());
+
+ miniClusterResource.before();
+ }
+
+ @AfterClass
+ public static void teardown() {
+ if (miniClusterResource != null) {
+ miniClusterResource.after();
+ }
+ }
+
+ /**
+ * Specifies which test case to run in {@link #testBlobServerCleanup(TestCase)}.
+ */
+ private enum TestCase {
+ JOB_FINISHES_SUCESSFULLY,
+ JOB_IS_CANCELLED,
+ JOB_FAILS,
+ JOB_SUBMISSION_FAILS
+ }
+
+ /**
+ * Test cleanup for a job that finishes ordinarily.
+ */
+ @Test
+ public void testBlobServerCleanupFinishedJob() throws Exception {
+ testBlobServerCleanup(TestCase.JOB_FINISHES_SUCESSFULLY);
+ }
+
+ /**
+ * Test cleanup for a job which is cancelled after submission.
+ */
+ @Test
+ public void testBlobServerCleanupCancelledJob() throws Exception {
+ testBlobServerCleanup(TestCase.JOB_IS_CANCELLED);
+ }
+
+ /**
+ * Test cleanup for a job that fails (first a task fails, then the job recovers, then the whole
+ * job fails due to a limited restart policy).
+ */
+ @Test
+ public void testBlobServerCleanupFailedJob() throws Exception {
+ testBlobServerCleanup(TestCase.JOB_FAILS);
+ }
+
+ /**
+ * Test cleanup for a job that fails job submission (emulated by an additional BLOB not being
+ * present).
+ */
+ @Test
+ public void testBlobServerCleanupFailedSubmission() throws Exception {
+ testBlobServerCleanup(TestCase.JOB_SUBMISSION_FAILS);
+ }
+
+ private void testBlobServerCleanup(final TestCase testCase) throws Exception {
+ final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+ final int numTasks = 2;
+ final Deadline timeout = Deadline.fromNow(Duration.ofSeconds(30L));
+
+ final JobGraph jobGraph = createJobGraph(testCase, numTasks);
+ final JobID jid = jobGraph.getJobID();
+
+ // upload a blob
+ final File tempBlob = File.createTempFile("Required", ".jar");
+ final int blobPort = miniCluster.getClusterInformation().getBlobServerPort();
+ List<PermanentBlobKey> keys =
+ BlobClient.uploadFiles(new InetSocketAddress("localhost", blobPort),
+ configuration, jid,
+ Collections.singletonList(new Path(tempBlob.getAbsolutePath())));
+ assertThat(keys, hasSize(1));
+ jobGraph.addUserJarBlobKey(keys.get(0));
+
+ if (testCase == TestCase.JOB_SUBMISSION_FAILS) {
+ // add an invalid key so that the submission fails
+ jobGraph.addUserJarBlobKey(new PermanentBlobKey());
+ }
+
+ final CompletableFuture<JobSubmissionResult> submissionFuture = miniCluster.submitJob(jobGraph);
+
+ if (testCase == TestCase.JOB_SUBMISSION_FAILS) {
+ try {
+ submissionFuture.get();
+ fail("Expected job submission failure.");
+ } catch (ExecutionException e) {
+ assertThat(ExceptionUtils.findThrowable(e, JobSubmissionException.class).isPresent(), is(true));
+ }
+ } else {
+ final JobSubmissionResult jobSubmissionResult = submissionFuture.get();
+
+ assertThat(jobSubmissionResult.getJobID(), is(jid));
+
+ final CompletableFuture<JobResult> resultFuture = miniCluster.requestJobResult(jid);
+
+ if (testCase == TestCase.JOB_FAILS) {
+ // fail a task so that the job is going to be recovered (we actually do not
+ // need the blocking part of the invokable and can start throwing right away)
+ FailingBlockingInvokable.unblock();
+
+ // job will get restarted, BlobCache may re-download the BLOB if already deleted
+ // then the tasks will fail again and the restart strategy will finalise the job
+ final JobResult jobResult = resultFuture.get();
+ assertThat(jobResult.isSuccess(), is(false));
+ assertThat(jobResult.getApplicationStatus(), is(ApplicationStatus.FAILED));
+ } else if (testCase == TestCase.JOB_IS_CANCELLED) {
+
+ miniCluster.cancelJob(jid);
+
+ final JobResult jobResult = resultFuture.get();
+ assertThat(jobResult.isSuccess(), is(false));
+ assertThat(jobResult.getApplicationStatus(), is(ApplicationStatus.CANCELED));
+ } else {
+ final JobResult jobResult = resultFuture.get();
+ assertThat(jobResult.isSuccess(), is(true));
+ }
+
+ }
+
+ // both BlobServer and BlobCache should eventually delete all files
+
+ File[] blobDirs = blobBaseDir.listFiles((dir, name) -> name.startsWith("blobStore-"));
+ assertNotNull(blobDirs);
+ for (File blobDir : blobDirs) {
+ waitForEmptyBlobDir(blobDir, timeout.timeLeft());
+ }
+ }
+
+ @Nonnull
+ private JobGraph createJobGraph(TestCase testCase, int numTasks) {
+ JobVertex source = new JobVertex("Source");
+ if (testCase == TestCase.JOB_FAILS || testCase == TestCase.JOB_IS_CANCELLED) {
+ source.setInvokableClass(FailingBlockingInvokable.class);
+ } else {
+ source.setInvokableClass(NoOpInvokable.class);
+ }
+ source.setParallelism(numTasks);
+
+ return new JobGraph("BlobCleanupTest", source);
+ }
+
+ /**
+ * Waits until the given {@link org.apache.flink.runtime.blob.BlobService} storage directory
+ * does not contain any job-related folders any more.
+ *
+ * @param blobDir
+ * directory of a {@link org.apache.flink.runtime.blob.BlobServer} or {@link
+ * org.apache.flink.runtime.blob.BlobCacheService}
+ * @param remaining
+ * remaining time for this test
+ *
+ * @see org.apache.flink.runtime.blob.BlobUtils
+ */
+ private static void waitForEmptyBlobDir(File blobDir, Duration remaining)
+ throws InterruptedException {
+ long deadline = System.currentTimeMillis() + remaining.toMillis();
+ String[] blobDirContents;
+ final FilenameFilter jobDirFilter = (dir, name) -> name.startsWith("job_");
+
+ do {
+ blobDirContents = blobDir.list(jobDirFilter);
+ if (blobDirContents == null || blobDirContents.length == 0) {
+ return;
+ }
+ Thread.sleep(RETRY_INTERVAL);
+ } while (System.currentTimeMillis() < deadline);
+
+ fail("Timeout while waiting for " + blobDir.getAbsolutePath() + " to become empty. Current contents: " + Arrays.toString(blobDirContents));
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
deleted file mode 100644
index 5c57bd9..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.blob.BlobClient;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
-import org.apache.flink.runtime.testtasks.NoOpInvokable;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Small test to check that the {@link org.apache.flink.runtime.blob.BlobServer} cleanup is executed
- * after job termination.
- */
-public class JobManagerCleanupITCase extends TestLogger {
-
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
-
- private static ActorSystem system;
-
- @BeforeClass
- public static void setup() {
- system = AkkaUtils.createLocalActorSystem(new Configuration());
- }
-
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- }
-
- /**
- * Specifies which test case to run in {@link #testBlobServerCleanup(TestCase)}.
- */
- private enum TestCase {
- JOB_FINISHES_SUCESSFULLY,
- JOB_IS_CANCELLED,
- JOB_FAILS,
- JOB_SUBMISSION_FAILS
- }
-
- /**
- * Test cleanup for a job that finishes ordinarily.
- */
- @Test
- public void testBlobServerCleanupFinishedJob() throws IOException {
- testBlobServerCleanup(TestCase.JOB_FINISHES_SUCESSFULLY);
- }
-
- /**
- * Test cleanup for a job which is cancelled after submission.
- */
- @Test
- public void testBlobServerCleanupCancelledJob() throws IOException {
- testBlobServerCleanup(TestCase.JOB_IS_CANCELLED);
- }
-
- /**
- * Test cleanup for a job that fails (first a task fails, then the job recovers, then the whole
- * job fails due to a limited restart policy).
- */
- @Test
- public void testBlobServerCleanupFailedJob() throws IOException {
- testBlobServerCleanup(TestCase.JOB_FAILS);
- }
-
- /**
- * Test cleanup for a job that fails job submission (emulated by an additional BLOB not being
- * present).
- */
- @Test
- public void testBlobServerCleanupFailedSubmission() throws IOException {
- testBlobServerCleanup(TestCase.JOB_SUBMISSION_FAILS);
- }
-
- private void testBlobServerCleanup(final TestCase testCase) throws IOException {
- final int num_tasks = 2;
- final File blobBaseDir = tmpFolder.newFolder();
-
- new JavaTestKit(system) {{
- new Within(duration("30 seconds")) {
- @Override
- protected void run() {
- // Setup
-
- TestingCluster cluster = null;
- File tempBlob = null;
-
- try {
- Configuration config = new Configuration();
- config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- config.setString(AkkaOptions.ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT());
- config.setString(BlobServerOptions.STORAGE_DIRECTORY, blobBaseDir.getAbsolutePath());
-
- config.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay");
- config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
- config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "1 s");
- // BLOBs are deleted from BlobCache between 1s and 2s after last reference
- // -> the BlobCache may still have the BLOB or not (let's test both cases randomly)
- config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
-
- cluster = new TestingCluster(config);
- cluster.start();
-
- final ActorGateway jobManagerGateway = cluster.getLeaderGateway(
- TestingUtils.TESTING_DURATION());
-
- // we can set the leader session ID to None because we don't use this gateway to send messages
- final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(),
- HighAvailabilityServices.DEFAULT_LEADER_ID);
-
- // Create a task
-
- JobVertex source = new JobVertex("Source");
- if (testCase == TestCase.JOB_FAILS || testCase == TestCase.JOB_IS_CANCELLED) {
- source.setInvokableClass(FailingBlockingInvokable.class);
- } else {
- source.setInvokableClass(NoOpInvokable.class);
- }
- source.setParallelism(num_tasks);
-
- JobGraph jobGraph = new JobGraph("BlobCleanupTest", source);
- final JobID jid = jobGraph.getJobID();
-
- // request the blob port from the job manager
- Future<Object> future = jobManagerGateway
- .ask(JobManagerMessages.getRequestBlobManagerPort(), remaining());
- int blobPort = (Integer) Await.result(future, remaining());
-
- // upload a blob
- tempBlob = File.createTempFile("Required", ".jar");
- List<PermanentBlobKey> keys =
- BlobClient.uploadFiles(new InetSocketAddress("localhost", blobPort),
- config, jid,
- Collections.singletonList(new Path(tempBlob.getAbsolutePath())));
- assertEquals(1, keys.size());
- jobGraph.addUserJarBlobKey(keys.get(0));
-
- if (testCase == TestCase.JOB_SUBMISSION_FAILS) {
- // add an invalid key so that the submission fails
- jobGraph.addUserJarBlobKey(new PermanentBlobKey());
- }
-
- // Submit the job and wait for all vertices to be running
- jobManagerGateway.tell(
- new JobManagerMessages.SubmitJob(
- jobGraph,
- // NOTE: to not receive two different (arbitrarily ordered) messages
- // upon cancellation, only listen for the job submission
- // message when cancelling the job
- testCase == TestCase.JOB_IS_CANCELLED ?
- ListeningBehaviour.DETACHED :
- ListeningBehaviour.EXECUTION_RESULT
- ),
- testActorGateway);
- if (testCase == TestCase.JOB_SUBMISSION_FAILS) {
- expectMsgClass(JobManagerMessages.JobResultFailure.class);
- } else {
- expectMsgEquals(new JobManagerMessages.JobSubmitSuccess(jid));
-
- if (testCase == TestCase.JOB_FAILS) {
- // fail a task so that the job is going to be recovered (we actually do not
- // need the blocking part of the invokable and can start throwing right away)
- FailingBlockingInvokable.unblock();
-
- // job will get restarted, BlobCache may re-download the BLOB if already deleted
- // then the tasks will fail again and the restart strategy will finalise the job
-
- expectMsgClass(JobManagerMessages.JobResultFailure.class);
- } else if (testCase == TestCase.JOB_IS_CANCELLED) {
- jobManagerGateway.tell(
- new JobManagerMessages.CancelJob(jid),
- testActorGateway);
-
- expectMsgEquals(new JobManagerMessages.CancellationSuccess(jid, null));
- } else {
- expectMsgClass(JobManagerMessages.JobResultSuccess.class);
- }
- }
-
- // both BlobServer and BlobCache should eventually delete all files
-
- File[] blobDirs = blobBaseDir.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.startsWith("blobStore-");
- }
- });
- assertNotNull(blobDirs);
- for (File blobDir : blobDirs) {
- waitForEmptyBlobDir(blobDir, remaining());
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- } finally {
- if (cluster != null) {
- cluster.stop();
- }
- if (tempBlob != null) {
- assertTrue(tempBlob.delete());
- }
- }
- }
- };
- }};
-
- // after everything has been shut down, the storage directory itself should be empty
- assertArrayEquals(new File[] {}, blobBaseDir.listFiles());
- }
-
- /**
- * Waits until the given {@link org.apache.flink.runtime.blob.BlobService} storage directory
- * does not contain any job-related folders any more.
- *
- * @param blobDir
- * directory of a {@link org.apache.flink.runtime.blob.BlobServer} or {@link
- * org.apache.flink.runtime.blob.BlobCacheService}
- * @param remaining
- * remaining time for this test
- *
- * @see org.apache.flink.runtime.blob.BlobUtils
- */
- private static void waitForEmptyBlobDir(File blobDir, FiniteDuration remaining)
- throws InterruptedException {
- long deadline = System.currentTimeMillis() + remaining.toMillis();
- String[] blobDirContents;
- do {
- blobDirContents = blobDir.list(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.startsWith("job_");
- }
- });
- if (blobDirContents == null || blobDirContents.length == 0) {
- return;
- }
- Thread.sleep(100);
- } while (System.currentTimeMillis() < deadline);
-
- fail("Timeout while waiting for " + blobDir.getAbsolutePath() + " to become empty. Current contents: " + Arrays.toString(blobDirContents));
- }
-}