You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/02/04 08:02:19 UTC
[flink] 01/10: [hotfix] Makes FileSystemBlobStore.(delete|deleteAll) comply to the BlobStore interface
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5a665b08111cba624d353ae1992c4cbfcfada033
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Sat Jan 22 16:59:59 2022 +0100
[hotfix] Makes FileSystemBlobStore.(delete|deleteAll) comply to the BlobStore interface
---
.../flink/runtime/blob/FileSystemBlobStore.java | 8 +-
.../runtime/blob/FileSystemBlobStoreTest.java | 229 +++++++++++++++++++++
2 files changed, 236 insertions(+), 1 deletion(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index d03cc74..e8b53cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -158,7 +158,13 @@ public class FileSystemBlobStore implements BlobStoreService {
Path path = new Path(blobPath);
- boolean result = fileSystem.delete(path, true);
+ boolean result = true;
+ if (fileSystem.exists(path)) {
+ result = fileSystem.delete(path, true);
+ } else {
+ LOG.debug(
+ "The given path {} is not present anymore. No deletion is required.", path);
+ }
// send a call to delete the directory containing the file. This will
// fail (and be ignored) when some files still exist.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FileSystemBlobStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FileSystemBlobStoreTest.java
index 5b72270..479f584 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FileSystemBlobStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FileSystemBlobStoreTest.java
@@ -22,26 +22,255 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.local.LocalDataOutputStream;
import org.apache.flink.runtime.state.filesystem.TestFs;
+import org.apache.flink.testutils.TestFileSystem;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
+import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.nio.file.Path;
+import java.security.MessageDigest;
import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for the {@link FileSystemBlobStore}. */
@ExtendWith(TestLoggerExtension.class)
class FileSystemBlobStoreTest {
+ private FileSystemBlobStore testInstance;
+ private Path storagePath;
+
+ @BeforeEach
+ public void createTestInstance(@TempDir Path storagePath) throws IOException {
+ this.testInstance = new FileSystemBlobStore(new TestFileSystem(), storagePath.toString());
+ this.storagePath = storagePath;
+ }
+
+ @AfterEach
+ public void finalizeTestInstance() throws IOException {
+ testInstance.close();
+ }
+
+ @Test
+ public void testSuccessfulPut() throws IOException {
+ final Path temporaryFile = createTemporaryFileWithContent("put");
+
+ final JobID jobId = new JobID();
+ final BlobKey blobKey = createPermanentBlobKeyFromFile(temporaryFile);
+ assertThat(getBlobDirectoryPath()).isEmptyDirectory();
+
+ final boolean successfullyWritten =
+ testInstance.put(temporaryFile.toFile(), jobId, blobKey);
+ assertThat(successfullyWritten).isTrue();
+
+ assertThat(getPath(jobId)).isDirectory().exists();
+ assertThat(getPath(jobId, blobKey)).isNotEmptyFile().hasSameTextualContentAs(temporaryFile);
+ }
+
+ @Test
+ public void testMissingFilePut() throws IOException {
+ assertThatThrownBy(
+ () ->
+ testInstance.put(
+ new File("/not/existing/file"),
+ new JobID(),
+ new PermanentBlobKey()))
+ .isInstanceOf(FileNotFoundException.class);
+ }
+
+ @Test
+ public void testSuccessfulGet() throws IOException {
+ final Path temporaryFile = createTemporaryFileWithContent("get");
+ final JobID jobId = new JobID();
+ final BlobKey blobKey = createPermanentBlobKeyFromFile(temporaryFile);
+
+ assertThat(testInstance.put(temporaryFile.toFile(), jobId, blobKey)).isTrue();
+
+ final Path targetFile = Files.createTempFile("filesystemblobstoretest-get-target-", "");
+ assertThat(targetFile).isEmptyFile();
+ final boolean successfullyGet = testInstance.get(jobId, blobKey, targetFile.toFile());
+ assertThat(successfullyGet).isTrue();
+
+ assertThat(targetFile).hasSameTextualContentAs(temporaryFile);
+ }
+
+ @Test
+ public void testGetWithWrongJobId() throws IOException {
+ final Path temporaryFile = createTemporaryFileWithContent("get");
+ final BlobKey blobKey = createPermanentBlobKeyFromFile(temporaryFile);
+
+ assertThat(testInstance.put(temporaryFile.toFile(), new JobID(), blobKey)).isTrue();
+
+ assertThatThrownBy(
+ () ->
+ testInstance.get(
+ new JobID(),
+ blobKey,
+ Files.createTempFile(
+ "filesystemblobstoretest-get-with-wrong-jobid-",
+ "")
+ .toFile()))
+ .isInstanceOf(FileNotFoundException.class);
+ }
+
+ @Test
+ public void testGetWithWrongBlobKey() throws IOException {
+ final Path temporaryFile = createTemporaryFileWithContent("get");
+
+ final JobID jobId = new JobID();
+ assertThat(testInstance.put(temporaryFile.toFile(), jobId, new PermanentBlobKey()))
+ .isTrue();
+
+ assertThatThrownBy(
+ () ->
+ testInstance.get(
+ jobId,
+ new PermanentBlobKey(),
+ Files.createTempFile(
+ "filesystemblobstoretest-get-with-wrong-blobkey-",
+ "")
+ .toFile()))
+ .isInstanceOf(FileNotFoundException.class);
+ }
+
+ @Test
+ public void testSuccessfulDeleteOnlyBlob() throws IOException {
+ final Path temporaryFile = createTemporaryFileWithContent("delete");
+ final JobID jobId = new JobID();
+ final BlobKey blobKey = createPermanentBlobKeyFromFile(temporaryFile);
+
+ assertThat(testInstance.put(temporaryFile.toFile(), jobId, blobKey)).isTrue();
+
+ assertThat(getPath(jobId)).isDirectory().exists();
+ assertThat(getPath(jobId, blobKey)).isNotEmptyFile();
+
+ final boolean successfullyDeleted = testInstance.delete(jobId, blobKey);
+
+ assertThat(successfullyDeleted).isTrue();
+ assertThat(getPath(jobId)).doesNotExist();
+ }
+
+ @Test
+ public void testSuccessfulDeleteBlob() throws IOException {
+ final Path temporaryFile = createTemporaryFileWithContent("delete");
+ final JobID jobId = new JobID();
+ final BlobKey blobKey = createPermanentBlobKeyFromFile(temporaryFile);
+ final BlobKey otherBlobKey = new PermanentBlobKey();
+
+ assertThat(testInstance.put(temporaryFile.toFile(), jobId, blobKey)).isTrue();
+ // create another artifact to omit deleting the directory
+ assertThat(testInstance.put(temporaryFile.toFile(), jobId, otherBlobKey)).isTrue();
+
+ assertThat(getPath(jobId)).isDirectory().exists();
+ assertThat(getPath(jobId, blobKey)).isNotEmptyFile();
+ assertThat(getPath(jobId, otherBlobKey)).isNotEmptyFile();
+
+ final boolean successfullyDeleted = testInstance.delete(jobId, blobKey);
+
+ assertThat(successfullyDeleted).isTrue();
+ assertThat(getPath(jobId, otherBlobKey)).exists();
+ }
+
+ @Test
+ public void testDeleteWithNotExistingJobId() {
+ assertThat(testInstance.delete(new JobID(), new PermanentBlobKey())).isTrue();
+ }
+
+ @Test
+ public void testDeleteWithNotExistingBlobKey() throws IOException {
+ final Path temporaryFile = createTemporaryFileWithContent("delete");
+ final JobID jobId = new JobID();
+ final BlobKey blobKey = createPermanentBlobKeyFromFile(temporaryFile);
+
+ assertThat(testInstance.put(temporaryFile.toFile(), jobId, blobKey)).isTrue();
+ assertThat(testInstance.delete(jobId, new PermanentBlobKey())).isTrue();
+ assertThat(getPath(jobId, blobKey)).exists();
+ }
+
+ @Test
+ public void testDeleteAll() throws IOException {
+ final Path temporaryFile = createTemporaryFileWithContent("delete");
+ final JobID jobId = new JobID();
+
+ assertThat(testInstance.put(temporaryFile.toFile(), jobId, new PermanentBlobKey()))
+ .isTrue();
+ assertThat(testInstance.put(temporaryFile.toFile(), jobId, new PermanentBlobKey()))
+ .isTrue();
+
+ assertThat(getPath(jobId)).isDirectory().exists();
+ assertThat(getPath(jobId).toFile().listFiles()).hasSize(2);
+
+ assertThat(testInstance.deleteAll(jobId)).isTrue();
+ assertThat(getPath(jobId)).doesNotExist();
+ }
+
+ @Test
+ public void testDeleteAllWithNotExistingJobId() {
+ final JobID jobId = new JobID();
+ assertThat(testInstance.deleteAll(jobId)).isTrue();
+ assertThat(getPath(jobId)).doesNotExist();
+ }
+
+ private Path createTemporaryFileWithContent(String operationLabel) throws IOException {
+ final String actualContent =
+ String.format("Content for testing the %s operation", operationLabel);
+ final Path temporaryFile =
+ Files.createTempFile(
+ String.format("filesystemblobstoretest-%s-", operationLabel), "");
+ try (BufferedWriter writer =
+ new BufferedWriter(new FileWriter(temporaryFile.toAbsolutePath().toString()))) {
+ writer.write(actualContent);
+ }
+
+ return temporaryFile;
+ }
+
+ private Path getBlobDirectoryPath() {
+ return storagePath.resolve(FileSystemBlobStore.BLOB_PATH_NAME);
+ }
+
+ private Path getPath(JobID jobId) {
+ return getBlobDirectoryPath().resolve(String.format("job_%s", jobId));
+ }
+
+ private Path getPath(JobID jobId, BlobKey blobKey) {
+ return getPath(jobId).resolve(String.format("blob_%s", blobKey));
+ }
+
+ private BlobKey createPermanentBlobKeyFromFile(Path path) throws IOException {
+ Preconditions.checkArgument(!Files.isDirectory(path));
+ Preconditions.checkArgument(Files.exists(path));
+
+ MessageDigest md = BlobUtils.createMessageDigest();
+ try (InputStream is = new FileInputStream(path.toFile())) {
+ final byte[] buf = new byte[1024];
+ int bytesRead = is.read(buf);
+ while (bytesRead >= 0) {
+ md.update(buf, 0, bytesRead);
+ bytesRead = is.read(buf);
+ }
+
+ return BlobKey.createKey(BlobKey.BlobType.PERMANENT_BLOB, md.digest());
+ }
+ }
+
@Test
public void fileSystemBlobStoreCallsSyncOnPut(@TempDir Path storageDirectory)
throws IOException {