You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by NicoK <gi...@git.apache.org> on 2017/06/20 09:41:48 UTC

[GitHub] flink pull request #4146: [FLINK-6008] collection of BlobServer improvements

GitHub user NicoK opened a pull request:

    https://github.com/apache/flink/pull/4146

    [FLINK-6008] collection of BlobServer improvements

    This PR is a light-weight version of #3512 that only includes the improvements and can serve as a base for FLIP-19. It improves the following things around the `BlobServer`/`BlobCache`:
    
    * replace config options in `config.md` with non-deprecated ones, e.g. `high-availability.cluster-id` and `high-availability.storageDir`
    * do not fail the `BlobServer` when a delete operation fails
    * add more unit tests
    * general code style and docs improvements, like using `Preconditions.checkArgument`
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/NicoK/flink flink-6008b

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4146.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4146
    
----
commit ce719ee39fbbca7b7828c17d9792fc87d37450c7
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-01-06T17:42:58Z

    [FLINK-6008][docs] update some config options to the new, non-deprecated ones

commit 9efa8808e46adc1253f52a6a8cec6d3b4d29fee3
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2016-12-20T15:49:57Z

    [FLINK-6008][docs] minor improvements in the BlobService docs

commit ca3d533b0affa645ec93d40de378dadc829bbfe5
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2016-12-20T17:27:13Z

    [FLINK-6008] refactor BlobCache#getURL() for cleaner code

commit 0eededeb36dd833835753def7f4bb27c9d5fb67e
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-03-09T17:14:02Z

    [FLINK-6008] use Preconditions.checkArgument in BlobClient

commit 6249041a9db2b39ddf54e79a1aed5e7706e739c7
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2016-12-21T15:23:29Z

    [FLINK-6008] do not fail the BlobServer if delete fails
    
    also extend the delete tests and remove one code duplication

commit e681239a538547f752d65358db1ebd2ba312b33c
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-03-17T15:21:40Z

    [FLINK-6008] fix concurrent job directory creation
    
    also add according unit tests

commit 20beae2dbc91859e2ec724b35b20536dcd11fe90
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-18T14:37:37Z

    [FLINK-6008] some comments about BlobLibraryCacheManager cleanup

commit 8a33517fe6eb2fa932ab17cb0d82a3fa8d7b8d0b
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-19T13:39:03Z

    [hotfix] minor typos

commit 23889866ac21494fc4af90905ab1518cbe897118
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-19T14:10:16Z

    [FLINK-6008] further cleanup tests for BlobLibraryCacheManager

commit 01b1a245528c264a6061ed3a48b24c5a207369f6
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-06-14T16:01:47Z

    [FLINK-6008] do not guard a delete() call with a check for existence

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4146#discussion_r125165567
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java ---
    @@ -540,7 +526,7 @@ else if (type == NAME_ADDRESSABLE) {
     					// we should make the local and remote file deletion atomic, otherwise we might risk not
     					// removing the remote file in case of a concurrent put operation
     					if (blobFile.exists() && !blobFile.delete()) {
    --- End diff --
    
    Shouldn't the order of the operations be changed like in the other cases?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4146: [FLINK-6008] collection of BlobServer improvements

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4146
  
    please note that there are four unrelated test failures:
    * 2x Kafka010ITCase
    * 2x YARNSessionCapacitySchedulerITCase


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4146#discussion_r125243142
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java ---
    @@ -540,7 +526,7 @@ else if (type == NAME_ADDRESSABLE) {
     					// we should make the local and remote file deletion atomic, otherwise we might risk not
     					// removing the remote file in case of a concurrent put operation
     					if (blobFile.exists() && !blobFile.delete()) {
    --- End diff --
    
    you're right, this could be improved as well - the next PR in this series is removing this code though


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4146#discussion_r124587248
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java ---
    @@ -509,21 +509,7 @@ private void delete(InputStream inputStream, OutputStream outputStream, byte[] b
     
     			if (type == CONTENT_ADDRESSABLE) {
     				BlobKey key = BlobKey.readFromInputStream(inputStream);
    -				File blobFile = blobServer.getStorageLocation(key);
    -
    -				writeLock.lock();
    -
    -				try {
    -					// we should make the local and remote file deletion atomic, otherwise we might risk not
    -					// removing the remote file in case of a concurrent put operation
    -					if (blobFile.exists() && !blobFile.delete()) {
    -						throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
    --- End diff --
    
    the (included) changes in `BlobServerDeleteTest` should cover this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4146


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4146#discussion_r124566819
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
    @@ -227,7 +227,7 @@ static File getStorageLocation(File storageDir, JobID jobID, String key) {
     	private static File getJobDirectory(File storageDir, JobID jobID) {
     		final File jobDirectory = new File(storageDir, JOB_DIR_PREFIX + jobID.toString());
     
    -		if (!jobDirectory.exists() && !jobDirectory.mkdirs()) {
    --- End diff --
    
    Why did you change the order of those operations? Is that fix for something? If so could you add explanation to the commit message what's going on here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4146#discussion_r124728498
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
    @@ -227,7 +227,7 @@ static File getStorageLocation(File storageDir, JobID jobID, String key) {
     	private static File getJobDirectory(File storageDir, JobID jobID) {
     		final File jobDirectory = new File(storageDir, JOB_DIR_PREFIX + jobID.toString());
     
    -		if (!jobDirectory.exists() && !jobDirectory.mkdirs()) {
    --- End diff --
    
    Ok, please add comment or commit message info about this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4146#discussion_r124564731
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
    @@ -593,9 +594,7 @@ private void sendPutHeader(OutputStream outputStream, JobID jobID, String key) t
     	 *         the BLOB server or if the BLOB server cannot delete the file
     	 */
     	public void delete(BlobKey key) throws IOException {
    -		if (key == null) {
    -			throw new IllegalArgumentException("BLOB key must not be null");
    -		}
    +		checkArgument(key != null, "BLOB key must not be null.");
    --- End diff --
    
    `requireNonNull`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4146#discussion_r124585333
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
    @@ -227,7 +227,7 @@ static File getStorageLocation(File storageDir, JobID jobID, String key) {
     	private static File getJobDirectory(File storageDir, JobID jobID) {
     		final File jobDirectory = new File(storageDir, JOB_DIR_PREFIX + jobID.toString());
     
    -		if (!jobDirectory.exists() && !jobDirectory.mkdirs()) {
    --- End diff --
    
    the new way is thread-safe - imagine a directory being concurrently created after another thread has failed the `exists()` part - not sure we actually fix a bug nowadays since the use may be guarded...so this is probably more of a "just-in-case" thing or if usage patterns change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4146#discussion_r124727900
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java ---
    @@ -509,21 +509,7 @@ private void delete(InputStream inputStream, OutputStream outputStream, byte[] b
     
     			if (type == CONTENT_ADDRESSABLE) {
     				BlobKey key = BlobKey.readFromInputStream(inputStream);
    -				File blobFile = blobServer.getStorageLocation(key);
    -
    -				writeLock.lock();
    -
    -				try {
    -					// we should make the local and remote file deletion atomic, otherwise we might risk not
    -					// removing the remote file in case of a concurrent put operation
    -					if (blobFile.exists() && !blobFile.delete()) {
    -						throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
    --- End diff --
    
    Ok, sorry, I didn't find it first time I was looking for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4146#discussion_r125165594
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java ---
    @@ -92,6 +99,69 @@ public void testDeleteSingle() {
     			catch (IllegalStateException e) {
     				// expected
     			}
    +
    +			// delete a file directly on the server
    +			server.delete(key2);
    +			try {
    +				server.getURL(key2);
    +				fail("BLOB should have been deleted");
    +			}
    +			catch (IOException e) {
    +				// expected
    +			}
    +		}
    +		catch (Exception e) {
    +			e.printStackTrace();
    +			fail(e.getMessage());
    +		}
    --- End diff --
    
    Can we remove this and simply let the exception be thrown? I think catching an exception, printing the stack trace and then failing with the exception message is an anti-pattern.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4146#discussion_r124727496
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
    @@ -593,9 +594,7 @@ private void sendPutHeader(OutputStream outputStream, JobID jobID, String key) t
     	 *         the BLOB server or if the BLOB server cannot delete the file
     	 */
     	public void delete(BlobKey key) throws IOException {
    -		if (key == null) {
    -			throw new IllegalArgumentException("BLOB key must not be null");
    -		}
    +		checkArgument(key != null, "BLOB key must not be null.");
    --- End diff --
    
    Yes, but it would be cleaner and less surprising for future users. However I have no strong feelings about it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4146#discussion_r124584383
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
    @@ -593,9 +594,7 @@ private void sendPutHeader(OutputStream outputStream, JobID jobID, String key) t
     	 *         the BLOB server or if the BLOB server cannot delete the file
     	 */
     	public void delete(BlobKey key) throws IOException {
    -		if (key == null) {
    -			throw new IllegalArgumentException("BLOB key must not be null");
    -		}
    +		checkArgument(key != null, "BLOB key must not be null.");
    --- End diff --
    
    unfortunately, `checkNotNull` throws a `NullPointerException` instead and I did not want to change that here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4146#discussion_r124730014
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java ---
    @@ -59,6 +60,110 @@
     
     	private final Random rnd = new Random();
     
    +
    +	// --- concurrency tests for utility methods which could fail during the put operation ---
    +
    +	/**
    +	 * Checked thread that calls {@link BlobServer#getStorageLocation(BlobKey)}
    +	 */
    +	public static class ContentAddressableGetStorageLocation extends CheckedThread {
    +		private final BlobServer server;
    +		private final BlobKey key;
    +
    +		public ContentAddressableGetStorageLocation(BlobServer server, BlobKey key) {
    +			this.server = server;
    +			this.key = key;
    +		}
    +
    +		@Override
    +		public void go() throws Exception {
    +			server.getStorageLocation(key);
    --- End diff --
    
    So maybe you could call in a loop multiple times `gets` and `deletes` (if there is such operation) interleaved? Otherwise I don't see a real value of those tests and I would prefer to drop them (so that we don't have to maintain tests that do not check for anything).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4146#discussion_r125244675
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java ---
    @@ -92,6 +99,69 @@ public void testDeleteSingle() {
     			catch (IllegalStateException e) {
     				// expected
     			}
    +
    +			// delete a file directly on the server
    +			server.delete(key2);
    +			try {
    +				server.getURL(key2);
    +				fail("BLOB should have been deleted");
    +			}
    +			catch (IOException e) {
    +				// expected
    +			}
    +		}
    +		catch (Exception e) {
    +			e.printStackTrace();
    +			fail(e.getMessage());
    +		}
    --- End diff --
    
    Absolutely right - I was told this was an issue some versions ago with JUnit or so... was just copying the code from the other tests in this class. Let me create a cleanup-PR on top of the newest PR in this series, i.e. #4238 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4146#discussion_r124568869
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java ---
    @@ -59,6 +60,110 @@
     
     	private final Random rnd = new Random();
     
    +
    +	// --- concurrency tests for utility methods which could fail during the put operation ---
    +
    +	/**
    +	 * Checked thread that calls {@link BlobServer#getStorageLocation(BlobKey)}
    +	 */
    +	public static class ContentAddressableGetStorageLocation extends CheckedThread {
    +		private final BlobServer server;
    +		private final BlobKey key;
    +
    +		public ContentAddressableGetStorageLocation(BlobServer server, BlobKey key) {
    +			this.server = server;
    +			this.key = key;
    +		}
    +
    +		@Override
    +		public void go() throws Exception {
    +			server.getStorageLocation(key);
    --- End diff --
    
    Shouldn't you call this at least couple/couple of dozens/couple of hundred times? Otherwise won't this complete before next thread starts up?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4146#discussion_r124566417
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java ---
    @@ -509,21 +509,7 @@ private void delete(InputStream inputStream, OutputStream outputStream, byte[] b
     
     			if (type == CONTENT_ADDRESSABLE) {
     				BlobKey key = BlobKey.readFromInputStream(inputStream);
    -				File blobFile = blobServer.getStorageLocation(key);
    -
    -				writeLock.lock();
    -
    -				try {
    -					// we should make the local and remote file deletion atomic, otherwise we might risk not
    -					// removing the remote file in case of a concurrent put operation
    -					if (blobFile.exists() && !blobFile.delete()) {
    -						throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
    --- End diff --
    
    Don't you want to add a test coverage for not throwing if delete fails?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4146#discussion_r124734798
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java ---
    @@ -59,6 +60,110 @@
     
     	private final Random rnd = new Random();
     
    +
    +	// --- concurrency tests for utility methods which could fail during the put operation ---
    +
    +	/**
    +	 * Checked thread that calls {@link BlobServer#getStorageLocation(BlobKey)}
    +	 */
    +	public static class ContentAddressableGetStorageLocation extends CheckedThread {
    +		private final BlobServer server;
    +		private final BlobKey key;
    +
    +		public ContentAddressableGetStorageLocation(BlobServer server, BlobKey key) {
    +			this.server = server;
    +			this.key = key;
    +		}
    +
    +		@Override
    +		public void go() throws Exception {
    +			server.getStorageLocation(key);
    --- End diff --
    
    Unfortunately, concurrency with `delete` operations does not work either if not guarded - the directory may not exist anymore between `jobDirectory.mkdirs()` and `jobDirectory.exists()`. I was able to reproduce the error with the existing test though - if you want to try it, just change the order of these two commands back - the test will not hit every time, but some times.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4146#discussion_r124569928
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
    @@ -411,12 +411,11 @@ public void delete(BlobKey key) throws IOException {
     		readWriteLock.writeLock().lock();
     
     		try {
    -			if (localFile.exists()) {
    -				if (!localFile.delete()) {
    -					LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath());
    -				}
    +			if (!localFile.delete() && localFile.exists()) {
    +				LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath());
     			}
     
    +
    --- End diff --
    
    remove extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4146: [FLINK-6008][blob] collection of BlobServer improv...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4146#discussion_r124586418
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java ---
    @@ -59,6 +60,110 @@
     
     	private final Random rnd = new Random();
     
    +
    +	// --- concurrency tests for utility methods which could fail during the put operation ---
    +
    +	/**
    +	 * Checked thread that calls {@link BlobServer#getStorageLocation(BlobKey)}
    +	 */
    +	public static class ContentAddressableGetStorageLocation extends CheckedThread {
    +		private final BlobServer server;
    +		private final BlobKey key;
    +
    +		public ContentAddressableGetStorageLocation(BlobServer server, BlobKey key) {
    +			this.server = server;
    +			this.key = key;
    +		}
    +
    +		@Override
    +		public void go() throws Exception {
    +			server.getStorageLocation(key);
    --- End diff --
    
    Actually, the job directory is only created once so it doesn't help adding more calls. What could help make this happen more often, is to add more threads to `BlobServerPutTest#testServerContentAddressableGetStorageLocationConcurrent()` otherwise the failure only happens once in a while if the implementation is not thread-safe


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4146: [FLINK-6008][blob] collection of BlobServer improvements

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4146
  
    merging,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---