You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/07/03 12:23:01 UTC

[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4238#discussion_r125249421
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
    @@ -162,105 +164,116 @@ static File initStorageDirectory(String storageDirectory) throws
     	}
     
     	/**
    -	 * Returns the BLOB service's directory for incoming files. The directory is created if it did
    -	 * not exist so far.
    +	 * Returns the BLOB service's directory for incoming (job-unrelated) files. The directory is
    +	 * created if it does not exist yet.
    +	 *
    +	 * @param storageDir
    +	 * 		storage directory used be the BLOB service
     	 *
    -	 * @return the BLOB server's directory for incoming files
    +	 * @return the BLOB service's directory for incoming files
     	 */
     	static File getIncomingDirectory(File storageDir) {
     		final File incomingDir = new File(storageDir, "incoming");
     
    -		if (!incomingDir.mkdirs() && !incomingDir.exists()) {
    -			throw new RuntimeException("Cannot create directory for incoming files " + incomingDir.getAbsolutePath());
    -		}
    +		mkdirTolerateExisting(incomingDir, "incoming");
     
     		return incomingDir;
     	}
     
     	/**
    -	 * Returns the BLOB service's directory for cached files. The directory is created if it did
    -	 * not exist so far.
    +	 * Makes sure a given directory exists by creating it if necessary.
     	 *
    -	 * @return the BLOB server's directory for cached files
    +	 * @param dir
    +	 * 		directory to create
    +	 * @param dirType
    +	 * 		the type of the directory (included in error message if something fails)
     	 */
    -	private static File getCacheDirectory(File storageDir) {
    -		final File cacheDirectory = new File(storageDir, "cache");
    -
    -		if (!cacheDirectory.mkdirs() && !cacheDirectory.exists()) {
    -			throw new RuntimeException("Could not create cache directory '" + cacheDirectory.getAbsolutePath() + "'.");
    +	private static void mkdirTolerateExisting(final File dir, final String dirType) {
    +		// note: thread-safe create should try to mkdir first and then ignore the case that the
    +		//       directory already existed
    +		if (!dir.mkdirs() && !dir.exists()) {
    +			throw new RuntimeException(
    +				"Cannot create " + dirType + " directory '" + dir.getAbsolutePath() + "'.");
     		}
    -
    -		return cacheDirectory;
     	}
     
     	/**
     	 * Returns the (designated) physical storage location of the BLOB with the given key.
     	 *
    +	 * @param storageDir
    +	 * 		storage directory used be the BLOB service
     	 * @param key
    -	 *        the key identifying the BLOB
    +	 * 		the key identifying the BLOB
    +	 * @param jobId
    +	 * 		ID of the job for the incoming files (or <tt>null</tt> if job-unrelated)
    +	 *
     	 * @return the (designated) physical storage location of the BLOB
     	 */
    -	static File getStorageLocation(File storageDir, BlobKey key) {
    -		return new File(getCacheDirectory(storageDir), BLOB_FILE_PREFIX + key.toString());
    -	}
    +	static File getStorageLocation(
    +			@Nonnull File storageDir, @Nullable JobID jobId, @Nonnull BlobKey key) {
    +		File file = new File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
     
    -	/**
    -	 * Returns the (designated) physical storage location of the BLOB with the given job ID and key.
    -	 *
    -	 * @param jobID
    -	 *        the ID of the job the BLOB belongs to
    -	 * @param key
    -	 *        the key of the BLOB
    -	 * @return the (designated) physical storage location of the BLOB with the given job ID and key
    -	 */
    -	static File getStorageLocation(File storageDir, JobID jobID, String key) {
    -		return new File(getJobDirectory(storageDir, jobID), BLOB_FILE_PREFIX + encodeKey(key));
    +		mkdirTolerateExisting(file.getParentFile(), "cache");
    --- End diff --
    
    Why are we creating a `cache` directory here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---