You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/05 14:07:02 UTC
[09/14] flink git commit: [FLINK-7261][blob] extend BlobStore#get/put
with boolean return values
[FLINK-7261][blob] extend BlobStore#get/put with boolean return values
This way, using code can distinguish non-HA cases, i.e. VoidBlobStore, from
HA cases, i.e. FileSystemBlobStore, in a general way and have better error
reporting.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b57330dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b57330dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b57330dc
Branch: refs/heads/master
Commit: b57330dc7685cfd0ef137c31b1521662bd993413
Parents: 071e27f
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jul 25 13:00:33 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 5 16:06:27 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/runtime/blob/BlobStore.java | 4 +++-
.../java/org/apache/flink/runtime/blob/BlobView.java | 4 +++-
.../flink/runtime/blob/FileSystemBlobStore.java | 15 +++++++++------
.../flink/runtime/blob/PermanentBlobCache.java | 10 ++++++----
.../org/apache/flink/runtime/blob/VoidBlobStore.java | 6 ++++--
5 files changed, 25 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b57330dc/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
index 38d3a73..1011873 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
@@ -34,9 +34,11 @@ public interface BlobStore extends BlobView {
* @param localFile The file to copy
* @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
* @param blobKey The ID for the file in the blob store
+ *
+ * @return whether the file was copied (<tt>true</tt>) or not (<tt>false</tt>)
* @throws IOException If the copy fails
*/
- void put(File localFile, JobID jobId, BlobKey blobKey) throws IOException;
+ boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException;
/**
* Tries to delete a blob from storage.
http://git-wip-us.apache.org/repos/asf/flink/blob/b57330dc/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
index 8916d95..9083a06 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
@@ -34,7 +34,9 @@ public interface BlobView {
* @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
* @param blobKey The blob ID
* @param localFile The local file to copy to
+ *
+ * @return whether the file was copied (<tt>true</tt>) or not (<tt>false</tt>)
* @throws IOException If the copy fails
*/
- void get(JobID jobId, BlobKey blobKey, File localFile) throws IOException;
+ boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b57330dc/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 71e5f63..1f9af03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -64,25 +64,26 @@ public class FileSystemBlobStore implements BlobStoreService {
// - Put ------------------------------------------------------------------
@Override
- public void put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
- put(localFile, BlobUtils.getStorageLocationPath(basePath, jobId, blobKey));
+ public boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
+ return put(localFile, BlobUtils.getStorageLocationPath(basePath, jobId, blobKey));
}
- private void put(File fromFile, String toBlobPath) throws IOException {
+ private boolean put(File fromFile, String toBlobPath) throws IOException {
try (OutputStream os = fileSystem.create(new Path(toBlobPath), FileSystem.WriteMode.OVERWRITE)) {
LOG.debug("Copying from {} to {}.", fromFile, toBlobPath);
Files.copy(fromFile, os);
}
+ return true;
}
// - Get ------------------------------------------------------------------
@Override
- public void get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
- get(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey), localFile, blobKey);
+ public boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
+ return get(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey), localFile, blobKey);
}
- private void get(String fromBlobPath, File toFile, BlobKey blobKey) throws IOException {
+ private boolean get(String fromBlobPath, File toFile, BlobKey blobKey) throws IOException {
checkNotNull(fromBlobPath, "Blob path");
checkNotNull(toFile, "File");
checkNotNull(blobKey, "Blob key");
@@ -127,6 +128,8 @@ public class FileSystemBlobStore implements BlobStoreService {
} catch (Throwable ignored) {}
}
}
+
+ return true; // success is always true here
}
// - Delete ---------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b57330dc/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
index 8582ce3..eaff24c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
@@ -282,11 +282,13 @@ public class PermanentBlobCache extends TimerTask implements PermanentBlobServic
File incomingFile = createTemporaryFilename();
try {
try {
- blobView.get(jobId, blobKey, incomingFile);
- BlobUtils.moveTempFileToStore(
- incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null);
+ if (blobView.get(jobId, blobKey, incomingFile)) {
+ // now move the temp file to our local cache atomically
+ BlobUtils.moveTempFileToStore(
+ incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null);
- return localFile;
+ return localFile;
+ }
} catch (Exception e) {
LOG.info("Failed to copy from blob store. Downloading from BLOB server instead.", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b57330dc/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
index 0436d1b..2a128df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -29,11 +29,13 @@ import java.io.IOException;
public class VoidBlobStore implements BlobStoreService {
@Override
- public void put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
+ public boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
+ return false;
}
@Override
- public void get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
+ public boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
+ return false;
}
@Override