You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2020/05/29 06:58:10 UTC

[flink] branch release-1.11 updated: [FLINK-17463][tests] Avoid concurrent directory creation and deletion

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 647f762  [FLINK-17463][tests] Avoid concurrent directory creation and deletion
647f762 is described below

commit 647f76283c900048e12361cf96d26db2a184b10b
Author: Gary Yao <ga...@apache.org>
AuthorDate: Wed May 27 15:23:47 2020 +0200

    [FLINK-17463][tests] Avoid concurrent directory creation and deletion
    
    BlobCacheCleanupTest#testPermanentBlobCleanup() tests that job related
    files are cleaned up by a background task when the job is released from
    the PermanentBlobCache. The tests asserts that the uploaded blobs are
    deleted from the filesystem. Because the scheduling of the background
    task cannot be controlled from outside the cache, the test polls the
    filesystem. More precisely, the test uses BlobUtils#getStorageLocation()
    to build the path on the filesystem given a blobkey and tests the
    existence of that path in regular intervals. As a side effect, however,
    BlobUtils#getStorageLocation() also creates all necessary directories to
    that path if they do not exist yet. This leads to a situation where
    directories and concurrently deleted and created, which can cause
    FileAlreadyExists exceptions. This commit fixes the issue.
    
    Note that the above applies to all tests that invoke
    BlobServerCleanupTest#checkFilesExist().
---
 .../java/org/apache/flink/runtime/blob/AbstractBlobCache.java  |  4 ++++
 .../main/java/org/apache/flink/runtime/blob/BlobServer.java    |  4 ++++
 .../org/apache/flink/runtime/blob/BlobServerCleanupTest.java   | 10 ++++++----
 3 files changed, 14 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
index ce12898..8a873f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
@@ -122,6 +122,10 @@ public abstract class AbstractBlobCache implements Closeable {
 		this.serverAddress = serverAddress;
 	}
 
+	public File getStorageDir() {
+		return storageDir;
+	}
+
 	/**
 	 * Returns local copy of the file for the BLOB with the given key.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index a47040c..a50f535 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -213,6 +213,10 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
 	//  Path Accessors
 	// --------------------------------------------------------------------------------------------
 
+	public File getStorageDir() {
+		return storageDir;
+	}
+
 	/**
 	 * Returns a file handle to the file associated with the given blob key on the blob
 	 * server.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
index aafba30..04c1187 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
@@ -204,20 +204,22 @@ public class BlobServerCleanupTest extends TestLogger {
 		int numFiles = 0;
 
 		for (BlobKey key : keys) {
-			final File blobFile;
+			final File storageDir;
 			if (blobService instanceof BlobServer) {
 				BlobServer server = (BlobServer) blobService;
-				blobFile = server.getStorageLocation(jobId, key);
+				storageDir = server.getStorageDir();
 			} else if (blobService instanceof PermanentBlobCache) {
 				PermanentBlobCache cache = (PermanentBlobCache) blobService;
-				blobFile = cache.getStorageLocation(jobId, key);
+				storageDir = cache.getStorageDir();
 			} else if (blobService instanceof TransientBlobCache) {
 				TransientBlobCache cache = (TransientBlobCache) blobService;
-				blobFile = cache.getStorageLocation(jobId, key);
+				storageDir = cache.getStorageDir();
 			} else {
 				throw new UnsupportedOperationException(
 					"unsupported BLOB service class: " + blobService.getClass().getCanonicalName());
 			}
+
+			final File blobFile = new File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
 			if (blobFile.exists()) {
 				++numFiles;
 			} else if (doThrow) {