You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/05 14:07:02 UTC

[09/14] flink git commit: [FLINK-7261][blob] extend BlobStore#get/put with boolean return values

[FLINK-7261][blob] extend BlobStore#get/put with boolean return values

This way, using code can distinguish non-HA cases, i.e. VoidBlobStore, from
HA cases, i.e. FileSystemBlobStore, in a general way and have better error
reporting.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b57330dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b57330dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b57330dc

Branch: refs/heads/master
Commit: b57330dc7685cfd0ef137c31b1521662bd993413
Parents: 071e27f
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jul 25 13:00:33 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 5 16:06:27 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/blob/BlobStore.java     |  4 +++-
 .../java/org/apache/flink/runtime/blob/BlobView.java |  4 +++-
 .../flink/runtime/blob/FileSystemBlobStore.java      | 15 +++++++++------
 .../flink/runtime/blob/PermanentBlobCache.java       | 10 ++++++----
 .../org/apache/flink/runtime/blob/VoidBlobStore.java |  6 ++++--
 5 files changed, 25 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b57330dc/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
index 38d3a73..1011873 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
@@ -34,9 +34,11 @@ public interface BlobStore extends BlobView {
 	 * @param localFile The file to copy
 	 * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
 	 * @param blobKey   The ID for the file in the blob store
+	 *
+	 * @return whether the file was copied (<tt>true</tt>) or not (<tt>false</tt>)
 	 * @throws IOException If the copy fails
 	 */
-	void put(File localFile, JobID jobId, BlobKey blobKey) throws IOException;
+	boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException;
 
 	/**
 	 * Tries to delete a blob from storage.

http://git-wip-us.apache.org/repos/asf/flink/blob/b57330dc/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
index 8916d95..9083a06 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
@@ -34,7 +34,9 @@ public interface BlobView {
 	 * @param jobId     ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
 	 * @param blobKey   The blob ID
 	 * @param localFile The local file to copy to
+	 *
+	 * @return whether the file was copied (<tt>true</tt>) or not (<tt>false</tt>)
 	 * @throws IOException If the copy fails
 	 */
-	void get(JobID jobId, BlobKey blobKey, File localFile) throws IOException;
+	boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b57330dc/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 71e5f63..1f9af03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -64,25 +64,26 @@ public class FileSystemBlobStore implements BlobStoreService {
 	// - Put ------------------------------------------------------------------
 
 	@Override
-	public void put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
-		put(localFile, BlobUtils.getStorageLocationPath(basePath, jobId, blobKey));
+	public boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
+		return put(localFile, BlobUtils.getStorageLocationPath(basePath, jobId, blobKey));
 	}
 
-	private void put(File fromFile, String toBlobPath) throws IOException {
+	private boolean put(File fromFile, String toBlobPath) throws IOException {
 		try (OutputStream os = fileSystem.create(new Path(toBlobPath), FileSystem.WriteMode.OVERWRITE)) {
 			LOG.debug("Copying from {} to {}.", fromFile, toBlobPath);
 			Files.copy(fromFile, os);
 		}
+		return true;
 	}
 
 	// - Get ------------------------------------------------------------------
 
 	@Override
-	public void get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
-		get(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey), localFile, blobKey);
+	public boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
+		return get(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey), localFile, blobKey);
 	}
 
-	private void get(String fromBlobPath, File toFile, BlobKey blobKey) throws IOException {
+	private boolean get(String fromBlobPath, File toFile, BlobKey blobKey) throws IOException {
 		checkNotNull(fromBlobPath, "Blob path");
 		checkNotNull(toFile, "File");
 		checkNotNull(blobKey, "Blob key");
@@ -127,6 +128,8 @@ public class FileSystemBlobStore implements BlobStoreService {
 				} catch (Throwable ignored) {}
 			}
 		}
+
+		return true; // success is always true here
 	}
 
 	// - Delete ---------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b57330dc/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
index 8582ce3..eaff24c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
@@ -282,11 +282,13 @@ public class PermanentBlobCache extends TimerTask implements PermanentBlobServic
 		File incomingFile = createTemporaryFilename();
 		try {
 			try {
-				blobView.get(jobId, blobKey, incomingFile);
-				BlobUtils.moveTempFileToStore(
-					incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null);
+				if (blobView.get(jobId, blobKey, incomingFile)) {
+					// now move the temp file to our local cache atomically
+					BlobUtils.moveTempFileToStore(
+						incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null);
 
-				return localFile;
+					return localFile;
+				}
 			} catch (Exception e) {
 				LOG.info("Failed to copy from blob store. Downloading from BLOB server instead.", e);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/b57330dc/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
index 0436d1b..2a128df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -29,11 +29,13 @@ import java.io.IOException;
 public class VoidBlobStore implements BlobStoreService {
 
 	@Override
-	public void put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
+	public boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
+		return false;
 	}
 
 	@Override
-	public void get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
+	public boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
+		return false;
 	}
 
 	@Override