You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/02/06 16:55:41 UTC
[flink] 02/03: [FLINK-25954][runtime] Adds cleanup testcases to BlobServerCleanupTest
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9cac58227d2c28e7fb5262cc76b3f5b15515a73c
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Feb 2 00:20:55 2022 +0100
[FLINK-25954][runtime] Adds cleanup testcases to BlobServerCleanupTest
---
.../flink/runtime/blob/BlobServerCleanupTest.java | 181 ++++++++++++++++++++-
1 file changed, 173 insertions(+), 8 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
index 5b484b4..15cc61e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.function.TriConsumerWithException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -41,12 +42,15 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -58,6 +62,7 @@ import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
/** A few tests for the cleanup of transient BLOBs at the {@link BlobServer}. */
public class BlobServerCleanupTest extends TestLogger {
@@ -73,6 +78,21 @@ public class BlobServerCleanupTest extends TestLogger {
return randomData;
}
+ private static BlobServer createTestInstance(String storageDirectoryPath, long cleanupInterval)
+ throws IOException {
+ return createTestInstance(storageDirectoryPath, cleanupInterval, new VoidBlobStore());
+ }
+
+ private static BlobServer createTestInstance(
+ String storageDirectoryPath, long cleanupInterval, BlobStore blobStore)
+ throws IOException {
+ final Configuration config = new Configuration();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY, storageDirectoryPath);
+ config.setLong(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval);
+
+ return new BlobServer(config, new File(storageDirectoryPath), blobStore);
+ }
+
@Test
public void testTransientBlobNoJobCleanup()
throws IOException, InterruptedException, ExecutionException {
@@ -102,13 +122,10 @@ public class BlobServerCleanupTest extends TestLogger {
byte[] data = createRandomData();
byte[] data2 = createRandomData();
- Configuration config = new Configuration();
- config.setLong(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval);
-
long cleanupLowerBound;
try (BlobServer server =
- new BlobServer(config, temporaryFolder, new VoidBlobStore())) {
+ createTestInstance(temporaryFolder.getAbsolutePath(), cleanupInterval)) {
ConcurrentMap<Tuple2<JobID, TransientBlobKey>, Long> transientBlobExpiryTimes =
server.getBlobExpiryTimes();
@@ -195,6 +212,154 @@ public class BlobServerCleanupTest extends TestLogger {
}
@Test
+ public void testLocalCleanup() throws Exception {
+ final TestingBlobStore blobStore =
+ createTestingBlobStoreBuilder()
+ .setDeleteAllFunction(
+ jobDataToDelete ->
+ fail(
+ "No deleteAll call is expected to be triggered but was for %s.",
+ jobDataToDelete))
+ .createTestingBlobStore();
+ testSuccessfulCleanup(
+ new JobID(),
+ (testInstance, jobId, executor) ->
+ testInstance.localCleanupAsync(jobId, executor).join(),
+ blobStore);
+ }
+
+ @Test
+ public void testGlobalCleanup() throws Exception {
+ final Set<JobID> actuallyDeletedJobData = new HashSet<>();
+ final JobID jobId = new JobID();
+ final TestingBlobStore blobStore =
+ createTestingBlobStoreBuilder()
+ .setDeleteAllFunction(
+ jobDataToDelete -> {
+ actuallyDeletedJobData.add(jobDataToDelete);
+ return true;
+ })
+ .createTestingBlobStore();
+ testSuccessfulCleanup(
+ jobId,
+ (testInstance, jobIdForCleanup, executor) ->
+ testInstance.globalCleanupAsync(jobIdForCleanup, executor).join(),
+ blobStore);
+
+ assertThat(actuallyDeletedJobData).containsExactlyInAnyOrder(jobId);
+ }
+
+ @Test
+ public void testGlobalCleanupUnsuccessfulInBlobStore() throws Exception {
+ final TestingBlobStore blobStore =
+ createTestingBlobStoreBuilder()
+ .setDeleteAllFunction(jobDataToDelete -> false)
+ .createTestingBlobStore();
+
+ testFailedCleanup(
+ new JobID(),
+ (testInstance, jobId, executor) ->
+ assertThat(testInstance.globalCleanupAsync(new JobID(), executor))
+ .failsWithin(Duration.ofMillis(100))
+ .withThrowableOfType(ExecutionException.class)
+ .withCauseInstanceOf(IOException.class),
+ blobStore);
+ }
+
+ @Test
+ public void testGlobalCleanupFailureInBlobStore() throws Exception {
+ final RuntimeException actualException = new RuntimeException("Expected RuntimeException");
+ final TestingBlobStore blobStore =
+ createTestingBlobStoreBuilder()
+ .setDeleteAllFunction(
+ jobDataToDelete -> {
+ throw actualException;
+ })
+ .createTestingBlobStore();
+
+ testFailedCleanup(
+ new JobID(),
+ (testInstance, jobId, executor) ->
+ assertThat(testInstance.globalCleanupAsync(new JobID(), executor))
+ .failsWithin(Duration.ofMillis(100))
+ .withThrowableOfType(ExecutionException.class)
+ .withCause(actualException),
+ blobStore);
+ }
+
+ private TestingBlobStoreBuilder createTestingBlobStoreBuilder() {
+ return new TestingBlobStoreBuilder()
+ .setDeleteFunction(
+ (jobId, blobKey) -> {
+ throw new UnsupportedOperationException(
+ "Deletion of individual blobs is not supported.");
+ });
+ }
+
+ private void testFailedCleanup(
+ JobID jobId,
+ TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception> callback,
+ BlobStore blobStore)
+ throws Exception {
+ testCleanup(jobId, callback, blobStore, 2);
+ }
+
+ private void testSuccessfulCleanup(
+ JobID jobId,
+ TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception> callback,
+ BlobStore blobStore)
+ throws Exception {
+ testCleanup(jobId, callback, blobStore, 0);
+ }
+
+ private void testCleanup(
+ JobID jobId,
+ TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception> callback,
+ BlobStore blobStore,
+ int expectedFileCountAfterCleanup)
+ throws Exception {
+ final JobID otherJobId = new JobID();
+ final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ try (BlobServer testInstance =
+ createTestInstance(
+ temporaryFolder.getAbsolutePath(), Integer.MAX_VALUE, blobStore)) {
+ testInstance.start();
+
+ final BlobKey transientDataBlobKey =
+ put(testInstance, jobId, createRandomData(), TRANSIENT_BLOB);
+ final BlobKey otherTransientDataBlobKey =
+ put(testInstance, otherJobId, createRandomData(), TRANSIENT_BLOB);
+
+ final BlobKey permanentDataBlobKey =
+ put(testInstance, jobId, createRandomData(), PERMANENT_BLOB);
+ final BlobKey otherPermanentDataBlobKey =
+ put(testInstance, otherJobId, createRandomData(), PERMANENT_BLOB);
+
+ checkFilesExist(
+ jobId,
+ Arrays.asList(transientDataBlobKey, permanentDataBlobKey),
+ testInstance,
+ true);
+ checkFilesExist(
+ otherJobId,
+ Arrays.asList(otherTransientDataBlobKey, otherPermanentDataBlobKey),
+ testInstance,
+ true);
+
+ callback.accept(testInstance, jobId, executorService);
+
+ checkFileCountForJob(expectedFileCountAfterCleanup, jobId, testInstance);
+ checkFilesExist(
+ otherJobId,
+ Arrays.asList(otherTransientDataBlobKey, otherPermanentDataBlobKey),
+ testInstance,
+ true);
+ } finally {
+ assertThat(executorService.shutdownNow()).isEmpty();
+ }
+ }
+
+ @Test
public void testBlobServerExpiresRecoveredTransientJobBlob() throws Exception {
runBlobServerExpiresRecoveredTransientBlob(new JobID());
}
@@ -207,8 +372,6 @@ public class BlobServerCleanupTest extends TestLogger {
private void runBlobServerExpiresRecoveredTransientBlob(@Nullable JobID jobId)
throws Exception {
final long cleanupInterval = 1L;
- final Configuration configuration = new Configuration();
- configuration.set(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval);
final TransientBlobKey transientBlobKey =
TestingBlobUtils.writeTransientBlob(
@@ -216,7 +379,7 @@ public class BlobServerCleanupTest extends TestLogger {
final File blob = BlobUtils.getStorageLocation(temporaryFolder, jobId, transientBlobKey);
try (final BlobServer blobServer =
- new BlobServer(configuration, temporaryFolder, new VoidBlobStore())) {
+ createTestInstance(temporaryFolder.getAbsolutePath(), cleanupInterval)) {
CommonTestUtils.waitUntilCondition(
() -> !blob.exists(),
Deadline.fromNow(Duration.ofSeconds(cleanupInterval * 5L)),
@@ -237,7 +400,9 @@ public class BlobServerCleanupTest extends TestLogger {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
try (final BlobServer blobServer =
- new BlobServer(new Configuration(), temporaryFolder, new VoidBlobStore())) {
+ createTestInstance(
+ temporaryFolder.getAbsolutePath(),
+ BlobServerOptions.CLEANUP_INTERVAL.defaultValue())) {
blobServer.retainJobs(Collections.singleton(jobId1), executorService);
assertThat(blobServer.getFile(jobId1, blobKey1)).hasBinaryContent(fileContent);