You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/02/06 16:55:40 UTC
[flink] 01/03: [hotfix] Migrates BlobServerCleanupTest to JUnit5/AssertJ
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 59ccc685d83f7b6f6b8f20dba590a418085c8f00
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Sat Jan 22 16:58:52 2022 +0100
[hotfix] Migrates BlobServerCleanupTest to JUnit5/AssertJ
---
.../flink/runtime/blob/BlobServerCleanupTest.java | 52 +++++++++++-----------
1 file changed, 27 insertions(+), 25 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
index f4fa398..5b484b4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
@@ -28,9 +28,8 @@ import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import javax.annotation.Nullable;
@@ -59,14 +58,20 @@ import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertEquals;
/** A few tests for the cleanup of transient BLOBs at the {@link BlobServer}. */
public class BlobServerCleanupTest extends TestLogger {
- private final Random rnd = new Random();
+ private static final Random RANDOM = new Random();
- @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @TempDir private File temporaryFolder;
+
+ private static byte[] createRandomData() {
+ final byte[] randomData = new byte[2000000];
+ RANDOM.nextBytes(randomData);
+
+ return randomData;
+ }
@Test
public void testTransientBlobNoJobCleanup()
@@ -94,9 +99,8 @@ public class BlobServerCleanupTest extends TestLogger {
final List<CompletableFuture<Void>> getOperations =
new ArrayList<>(numberConcurrentGetOperations);
- byte[] data = new byte[2000000];
- rnd.nextBytes(data);
- byte[] data2 = Arrays.copyOfRange(data, 10, 54);
+ byte[] data = createRandomData();
+ byte[] data2 = createRandomData();
Configuration config = new Configuration();
config.setLong(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval);
@@ -104,7 +108,7 @@ public class BlobServerCleanupTest extends TestLogger {
long cleanupLowerBound;
try (BlobServer server =
- new BlobServer(config, temporaryFolder.newFolder(), new VoidBlobStore())) {
+ new BlobServer(config, temporaryFolder, new VoidBlobStore())) {
ConcurrentMap<Tuple2<JobID, TransientBlobKey>, Long> transientBlobExpiryTimes =
server.getBlobExpiryTimes();
@@ -136,14 +140,16 @@ public class BlobServerCleanupTest extends TestLogger {
final Long key1ExpiryAfterGet = transientBlobExpiryTimes.get(Tuple2.of(jobId, key1));
assertThat(key1ExpiryAfterGet).isGreaterThan(key1ExpiryAfterPut);
assertThat(key1ExpiryAfterGet).isGreaterThanOrEqualTo(cleanupLowerBound);
- assertEquals(key2ExpiryAfterPut, transientBlobExpiryTimes.get(Tuple2.of(jobId, key2)));
+ assertThat(key2ExpiryAfterPut)
+ .isEqualTo(transientBlobExpiryTimes.get(Tuple2.of(jobId, key2)));
// access key2, verify expiry times (delay at least 1ms to also verify key1 expiry is
// unchanged)
Thread.sleep(1);
cleanupLowerBound = System.currentTimeMillis() + cleanupInterval;
verifyContents(server, jobId, key2, data2);
- assertEquals(key1ExpiryAfterGet, transientBlobExpiryTimes.get(Tuple2.of(jobId, key1)));
+ assertThat(key1ExpiryAfterGet)
+ .isEqualTo(transientBlobExpiryTimes.get(Tuple2.of(jobId, key1)));
assertThat(transientBlobExpiryTimes.get(Tuple2.of(jobId, key2)))
.isGreaterThan(key2ExpiryAfterPut);
assertThat(transientBlobExpiryTimes.get(Tuple2.of(jobId, key2)))
@@ -203,15 +209,14 @@ public class BlobServerCleanupTest extends TestLogger {
final long cleanupInterval = 1L;
final Configuration configuration = new Configuration();
configuration.set(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval);
- final File storageDirectory = temporaryFolder.newFolder();
final TransientBlobKey transientBlobKey =
TestingBlobUtils.writeTransientBlob(
- storageDirectory.toPath(), jobId, new byte[] {1, 2, 3, 4});
- final File blob = BlobUtils.getStorageLocation(storageDirectory, jobId, transientBlobKey);
+ temporaryFolder.toPath(), jobId, new byte[] {1, 2, 3, 4});
+ final File blob = BlobUtils.getStorageLocation(temporaryFolder, jobId, transientBlobKey);
try (final BlobServer blobServer =
- new BlobServer(configuration, storageDirectory, new VoidBlobStore())) {
+ new BlobServer(configuration, temporaryFolder, new VoidBlobStore())) {
CommonTestUtils.waitUntilCondition(
() -> !blob.exists(),
Deadline.fromNow(Duration.ofSeconds(cleanupInterval * 5L)),
@@ -221,20 +226,18 @@ public class BlobServerCleanupTest extends TestLogger {
@Test
public void testBlobServerRetainsJobs() throws Exception {
- final File storageDirectory = temporaryFolder.newFolder();
-
final JobID jobId1 = new JobID();
final JobID jobId2 = new JobID();
final byte[] fileContent = {1, 2, 3, 4};
final PermanentBlobKey blobKey1 =
- TestingBlobUtils.writePermanentBlob(storageDirectory.toPath(), jobId1, fileContent);
+ TestingBlobUtils.writePermanentBlob(temporaryFolder.toPath(), jobId1, fileContent);
final PermanentBlobKey blobKey2 =
- TestingBlobUtils.writePermanentBlob(storageDirectory.toPath(), jobId2, fileContent);
+ TestingBlobUtils.writePermanentBlob(temporaryFolder.toPath(), jobId2, fileContent);
final ExecutorService executorService = Executors.newSingleThreadExecutor();
try (final BlobServer blobServer =
- new BlobServer(new Configuration(), storageDirectory, new VoidBlobStore())) {
+ new BlobServer(new Configuration(), temporaryFolder, new VoidBlobStore())) {
blobServer.retainJobs(Collections.singleton(jobId1), executorService);
assertThat(blobServer.getFile(jobId1, blobKey1)).hasBinaryContent(fileContent);
@@ -317,10 +320,9 @@ public class BlobServerCleanupTest extends TestLogger {
throw new IOException("File " + jobDir + " does not exist.");
}
} else {
- assertEquals(
- "Too many/few files in job dir: " + Arrays.asList(blobsForJob).toString(),
- expectedCount,
- blobsForJob.length);
+ assertThat(blobsForJob.length)
+ .as("Too many/few files in job dir: " + Arrays.asList(blobsForJob))
+ .isEqualTo(expectedCount);
}
}
}