You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/02/06 16:55:41 UTC

[flink] 02/03: [FLINK-25954][runtime] Adds cleanup testcases to BlobServerCleanupTest

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9cac58227d2c28e7fb5262cc76b3f5b15515a73c
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Feb 2 00:20:55 2022 +0100

    [FLINK-25954][runtime] Adds cleanup testcases to BlobServerCleanupTest
---
 .../flink/runtime/blob/BlobServerCleanupTest.java  | 181 ++++++++++++++++++++-
 1 file changed, 173 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
index 5b484b4..15cc61e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.function.TriConsumerWithException;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -41,12 +42,15 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -58,6 +62,7 @@ import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 /** A few tests for the cleanup of transient BLOBs at the {@link BlobServer}. */
 public class BlobServerCleanupTest extends TestLogger {
@@ -73,6 +78,21 @@ public class BlobServerCleanupTest extends TestLogger {
         return randomData;
     }
 
+    private static BlobServer createTestInstance(String storageDirectoryPath, long cleanupInterval)
+            throws IOException {
+        return createTestInstance(storageDirectoryPath, cleanupInterval, new VoidBlobStore());
+    }
+
+    private static BlobServer createTestInstance(
+            String storageDirectoryPath, long cleanupInterval, BlobStore blobStore)
+            throws IOException {
+        final Configuration config = new Configuration();
+        config.setString(BlobServerOptions.STORAGE_DIRECTORY, storageDirectoryPath);
+        config.setLong(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval);
+
+        return new BlobServer(config, new File(storageDirectoryPath), blobStore);
+    }
+
     @Test
     public void testTransientBlobNoJobCleanup()
             throws IOException, InterruptedException, ExecutionException {
@@ -102,13 +122,10 @@ public class BlobServerCleanupTest extends TestLogger {
         byte[] data = createRandomData();
         byte[] data2 = createRandomData();
 
-        Configuration config = new Configuration();
-        config.setLong(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval);
-
         long cleanupLowerBound;
 
         try (BlobServer server =
-                new BlobServer(config, temporaryFolder, new VoidBlobStore())) {
+                createTestInstance(temporaryFolder.getAbsolutePath(), cleanupInterval)) {
 
             ConcurrentMap<Tuple2<JobID, TransientBlobKey>, Long> transientBlobExpiryTimes =
                     server.getBlobExpiryTimes();
@@ -195,6 +212,154 @@ public class BlobServerCleanupTest extends TestLogger {
     }
 
     @Test
+    public void testLocalCleanup() throws Exception {
+        final TestingBlobStore blobStore =
+                createTestingBlobStoreBuilder()
+                        .setDeleteAllFunction(
+                                jobDataToDelete ->
+                                        fail(
+                                                "No deleteAll call is expected to be triggered but was for %s.",
+                                                jobDataToDelete))
+                        .createTestingBlobStore();
+        testSuccessfulCleanup(
+                new JobID(),
+                (testInstance, jobId, executor) ->
+                        testInstance.localCleanupAsync(jobId, executor).join(),
+                blobStore);
+    }
+
+    @Test
+    public void testGlobalCleanup() throws Exception {
+        final Set<JobID> actuallyDeletedJobData = new HashSet<>();
+        final JobID jobId = new JobID();
+        final TestingBlobStore blobStore =
+                createTestingBlobStoreBuilder()
+                        .setDeleteAllFunction(
+                                jobDataToDelete -> {
+                                    actuallyDeletedJobData.add(jobDataToDelete);
+                                    return true;
+                                })
+                        .createTestingBlobStore();
+        testSuccessfulCleanup(
+                jobId,
+                (testInstance, jobIdForCleanup, executor) ->
+                        testInstance.globalCleanupAsync(jobIdForCleanup, executor).join(),
+                blobStore);
+
+        assertThat(actuallyDeletedJobData).containsExactlyInAnyOrder(jobId);
+    }
+
+    @Test
+    public void testGlobalCleanupUnsuccessfulInBlobStore() throws Exception {
+        final TestingBlobStore blobStore =
+                createTestingBlobStoreBuilder()
+                        .setDeleteAllFunction(jobDataToDelete -> false)
+                        .createTestingBlobStore();
+
+        testFailedCleanup(
+                new JobID(),
+                (testInstance, jobId, executor) ->
+                        assertThat(testInstance.globalCleanupAsync(new JobID(), executor))
+                                .failsWithin(Duration.ofMillis(100))
+                                .withThrowableOfType(ExecutionException.class)
+                                .withCauseInstanceOf(IOException.class),
+                blobStore);
+    }
+
+    @Test
+    public void testGlobalCleanupFailureInBlobStore() throws Exception {
+        final RuntimeException actualException = new RuntimeException("Expected RuntimeException");
+        final TestingBlobStore blobStore =
+                createTestingBlobStoreBuilder()
+                        .setDeleteAllFunction(
+                                jobDataToDelete -> {
+                                    throw actualException;
+                                })
+                        .createTestingBlobStore();
+
+        testFailedCleanup(
+                new JobID(),
+                (testInstance, jobId, executor) ->
+                        assertThat(testInstance.globalCleanupAsync(new JobID(), executor))
+                                .failsWithin(Duration.ofMillis(100))
+                                .withThrowableOfType(ExecutionException.class)
+                                .withCause(actualException),
+                blobStore);
+    }
+
+    private TestingBlobStoreBuilder createTestingBlobStoreBuilder() {
+        return new TestingBlobStoreBuilder()
+                .setDeleteFunction(
+                        (jobId, blobKey) -> {
+                            throw new UnsupportedOperationException(
+                                    "Deletion of individual blobs is not supported.");
+                        });
+    }
+
+    private void testFailedCleanup(
+            JobID jobId,
+            TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception> callback,
+            BlobStore blobStore)
+            throws Exception {
+        testCleanup(jobId, callback, blobStore, 2);
+    }
+
+    private void testSuccessfulCleanup(
+            JobID jobId,
+            TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception> callback,
+            BlobStore blobStore)
+            throws Exception {
+        testCleanup(jobId, callback, blobStore, 0);
+    }
+
+    private void testCleanup(
+            JobID jobId,
+            TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception> callback,
+            BlobStore blobStore,
+            int expectedFileCountAfterCleanup)
+            throws Exception {
+        final JobID otherJobId = new JobID();
+        final ExecutorService executorService = Executors.newSingleThreadExecutor();
+        try (BlobServer testInstance =
+                createTestInstance(
+                        temporaryFolder.getAbsolutePath(), Integer.MAX_VALUE, blobStore)) {
+            testInstance.start();
+
+            final BlobKey transientDataBlobKey =
+                    put(testInstance, jobId, createRandomData(), TRANSIENT_BLOB);
+            final BlobKey otherTransientDataBlobKey =
+                    put(testInstance, otherJobId, createRandomData(), TRANSIENT_BLOB);
+
+            final BlobKey permanentDataBlobKey =
+                    put(testInstance, jobId, createRandomData(), PERMANENT_BLOB);
+            final BlobKey otherPermanentDataBlobKey =
+                    put(testInstance, otherJobId, createRandomData(), PERMANENT_BLOB);
+
+            checkFilesExist(
+                    jobId,
+                    Arrays.asList(transientDataBlobKey, permanentDataBlobKey),
+                    testInstance,
+                    true);
+            checkFilesExist(
+                    otherJobId,
+                    Arrays.asList(otherTransientDataBlobKey, otherPermanentDataBlobKey),
+                    testInstance,
+                    true);
+
+            callback.accept(testInstance, jobId, executorService);
+
+            checkFileCountForJob(expectedFileCountAfterCleanup, jobId, testInstance);
+            checkFilesExist(
+                    otherJobId,
+                    Arrays.asList(otherTransientDataBlobKey, otherPermanentDataBlobKey),
+                    testInstance,
+                    true);
+        } finally {
+            assertThat(executorService.shutdownNow()).isEmpty();
+        }
+    }
+
+    @Test
     public void testBlobServerExpiresRecoveredTransientJobBlob() throws Exception {
         runBlobServerExpiresRecoveredTransientBlob(new JobID());
     }
@@ -207,8 +372,6 @@ public class BlobServerCleanupTest extends TestLogger {
     private void runBlobServerExpiresRecoveredTransientBlob(@Nullable JobID jobId)
             throws Exception {
         final long cleanupInterval = 1L;
-        final Configuration configuration = new Configuration();
-        configuration.set(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval);
 
         final TransientBlobKey transientBlobKey =
                 TestingBlobUtils.writeTransientBlob(
@@ -216,7 +379,7 @@ public class BlobServerCleanupTest extends TestLogger {
         final File blob = BlobUtils.getStorageLocation(temporaryFolder, jobId, transientBlobKey);
 
         try (final BlobServer blobServer =
-                new BlobServer(configuration, temporaryFolder, new VoidBlobStore())) {
+                createTestInstance(temporaryFolder.getAbsolutePath(), cleanupInterval)) {
             CommonTestUtils.waitUntilCondition(
                     () -> !blob.exists(),
                     Deadline.fromNow(Duration.ofSeconds(cleanupInterval * 5L)),
@@ -237,7 +400,9 @@ public class BlobServerCleanupTest extends TestLogger {
 
         final ExecutorService executorService = Executors.newSingleThreadExecutor();
         try (final BlobServer blobServer =
-                new BlobServer(new Configuration(), temporaryFolder, new VoidBlobStore())) {
+                createTestInstance(
+                        temporaryFolder.getAbsolutePath(),
+                        BlobServerOptions.CLEANUP_INTERVAL.defaultValue())) {
             blobServer.retainJobs(Collections.singleton(jobId1), executorService);
 
             assertThat(blobServer.getFile(jobId1, blobKey1)).hasBinaryContent(fileContent);