You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/14 09:18:07 UTC
[1/3] flink git commit: [FLINK-7055][blob] refactor getURL() to the
more generic getFile()
Repository: flink
Updated Branches:
refs/heads/master f78eb0f8b -> 0a19c456a
[FLINK-7055][blob] refactor getURL() to the more generic getFile()
The fact that we always returned URL objects is a relic of the BlobServer's only
use for URLClassLoader. Since we'd like to extend its use, returning File
objects instead is more generic.
This closes #4236.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7c1dfaa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7c1dfaa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7c1dfaa
Branch: refs/heads/master
Commit: b7c1dfaa3175036287694520fc9bb1649707ef7d
Parents: f78eb0f
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Jun 21 16:14:15 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Aug 14 08:37:33 2017 +0200
----------------------------------------------------------------------
.../handlers/TaskManagerLogHandler.java | 2 +-
.../apache/flink/runtime/blob/BlobCache.java | 17 ++++++------
.../apache/flink/runtime/blob/BlobServer.java | 18 ++++++-------
.../apache/flink/runtime/blob/BlobService.java | 8 +++---
.../apache/flink/runtime/client/JobClient.java | 2 +-
.../librarycache/BlobLibraryCacheManager.java | 2 +-
.../runtime/blob/BlobCacheRetriesTest.java | 4 +--
.../runtime/blob/BlobCacheSuccessTest.java | 27 ++++++--------------
.../runtime/blob/BlobServerDeleteTest.java | 4 +--
.../BlobLibraryCacheManagerTest.java | 8 +++---
.../BlobLibraryCacheRecoveryITCase.java | 6 ++---
11 files changed, 44 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index b7fee2d..f175573 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -208,7 +208,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
lastSubmittedFile.put(taskManagerID, blobKey);
}
try {
- return blobCache.getURL(blobKey).getFile();
+ return blobCache.getFile(blobKey).getAbsolutePath();
} catch (IOException e) {
throw new FlinkFutureException("Could not retrieve blob for " + blobKey + '.', e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 32bd8fd..3e19537 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -30,7 +30,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
-import java.net.URL;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -39,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* The BLOB cache implements a local cache for content-addressable BLOBs.
*
- * <p>When requesting BLOBs through the {@link BlobCache#getURL} methods, the
+ * <p>When requesting BLOBs through the {@link BlobCache#getFile(BlobKey)} method, the
* BLOB cache will first attempt to serve the file from its local cache. Only if
* the local cache does not contain the desired BLOB, the BLOB cache will try to
* download it from a distributed file system (if available) or the BLOB
@@ -111,21 +110,22 @@ public final class BlobCache implements BlobService {
}
/**
- * Returns the URL for the BLOB with the given key. The method will first attempt to serve
+ * Returns local copy of the file for the BLOB with the given key. The method will first attempt to serve
* the BLOB from its local cache. If the BLOB is not in the cache, the method will try to download it
* from this cache's BLOB server.
*
* @param requiredBlob The key of the desired BLOB.
- * @return URL referring to the local storage location of the BLOB.
+ * @return file referring to the local storage location of the BLOB.
* @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
*/
- public URL getURL(final BlobKey requiredBlob) throws IOException {
+ @Override
+ public File getFile(final BlobKey requiredBlob) throws IOException {
checkArgument(requiredBlob != null, "BLOB key cannot be null.");
final File localJarFile = BlobUtils.getStorageLocation(storageDir, requiredBlob);
if (localJarFile.exists()) {
- return localJarFile.toURI().toURL();
+ return localJarFile;
}
// first try the distributed blob store (if available)
@@ -136,7 +136,7 @@ public final class BlobCache implements BlobService {
}
if (localJarFile.exists()) {
- return localJarFile.toURI().toURL();
+ return localJarFile;
}
// fallback: download from the BlobServer
@@ -160,7 +160,7 @@ public final class BlobCache implements BlobService {
}
// success, we finished
- return localJarFile.toURI().toURL();
+ return localJarFile;
}
catch (Throwable t) {
String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress +
@@ -188,6 +188,7 @@ public final class BlobCache implements BlobService {
* Deletes the file associated with the given key from the BLOB cache.
* @param key referring to the file to be deleted
*/
+ @Override
public void delete(BlobKey key) throws IOException{
final File localFile = BlobUtils.getStorageLocation(storageDir, key);
http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index ecb4527..add9f7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -34,7 +34,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
-import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
@@ -334,22 +333,23 @@ public class BlobServer extends Thread implements BlobService {
}
/**
- * Method which retrieves the URL of a file associated with a blob key. The blob server looks
- * the blob key up in its local storage. If the file exists, then the URL is returned. If the
- * file does not exist, then a FileNotFoundException is thrown.
+ * Method which retrieves the local path of a file associated with a blob key. The blob server
+ * looks the blob key up in its local storage. If the file exists, it is returned. If the
+ * file does not exist, it is retrieved from the HA blob store (if available) or a
+ * FileNotFoundException is thrown.
*
* @param requiredBlob blob key associated with the requested file
- * @return URL of the file
- * @throws IOException
+ * @return file referring to the local storage location of the BLOB.
+ * @throws IOException Thrown if the file retrieval failed.
*/
@Override
- public URL getURL(BlobKey requiredBlob) throws IOException {
+ public File getFile(BlobKey requiredBlob) throws IOException {
checkArgument(requiredBlob != null, "BLOB key cannot be null.");
final File localFile = BlobUtils.getStorageLocation(storageDir, requiredBlob);
if (localFile.exists()) {
- return localFile.toURI().toURL();
+ return localFile;
}
else {
try {
@@ -361,7 +361,7 @@ public class BlobServer extends Thread implements BlobService {
}
if (localFile.exists()) {
- return localFile.toURI().toURL();
+ return localFile;
}
else {
throw new FileNotFoundException("Local file " + localFile + " does not exist " +
http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
index c1447c8..1e56f26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.blob;
import java.io.Closeable;
+import java.io.File;
import java.io.IOException;
-import java.net.URL;
/**
* A simple store and retrieve binary large objects (BLOBs).
@@ -28,14 +28,14 @@ import java.net.URL;
public interface BlobService extends Closeable {
/**
- * Returns the URL of the file associated with the provided blob key.
+ * Returns the path to a local copy of the file associated with the provided blob key.
*
* @param key blob key associated with the requested file
- * @return The URL to the file.
+ * @return The path to the file.
* @throws java.io.FileNotFoundException when the path does not exist;
* @throws IOException if any other error occurs when retrieving the file
*/
- URL getURL(BlobKey key) throws IOException;
+ File getFile(BlobKey key) throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 19f0e2c..e3657ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -234,7 +234,7 @@ public class JobClient {
int pos = 0;
for (BlobKey blobKey : props.requiredJarFiles()) {
try {
- allURLs[pos++] = blobClient.getURL(blobKey);
+ allURLs[pos++] = blobClient.getFile(blobKey).toURI().toURL();
} catch (Exception e) {
try {
blobClient.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 0387725..9aff6f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -254,7 +254,7 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
// it is important that we fetch the URL before increasing the counter.
// in case the URL cannot be created (failed to fetch the BLOB), we have no stale counter
try {
- URL url = blobService.getURL(key);
+ URL url = blobService.getFile(key).toURI().toURL();
Integer references = blobKeyReferenceCounters.get(key);
int newReferences = references == null ? 1 : references + 1;
http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
index 366b592..fe763fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -115,7 +115,7 @@ public class BlobCacheRetriesTest {
cache = new BlobCache(serverAddress, config, new VoidBlobStore());
// trigger a download - it should fail the first two times, but retry, and succeed eventually
- URL url = cache.getURL(key);
+ URL url = cache.getFile(key).toURI().toURL();
InputStream is = url.openStream();
try {
byte[] received = new byte[data.length];
@@ -211,7 +211,7 @@ public class BlobCacheRetriesTest {
// trigger a download - it should fail eventually
try {
- cache.getURL(key);
+ cache.getFile(key);
fail("This should fail");
}
catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index 51be1b0..d06f76f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -30,15 +30,12 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
-import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
/**
* This class contains unit tests for the {@link BlobCache}.
@@ -175,7 +172,7 @@ public class BlobCacheSuccessTest {
blobCache = new BlobCache(serverAddress, cacheConfig, blobStoreService);
for (BlobKey blobKey : blobKeys) {
- blobCache.getURL(blobKey);
+ blobCache.getFile(blobKey);
}
if (blobServer != null) {
@@ -184,28 +181,20 @@ public class BlobCacheSuccessTest {
blobServer = null;
}
- final URL[] urls = new URL[blobKeys.size()];
+ final File[] files = new File[blobKeys.size()];
for(int i = 0; i < blobKeys.size(); i++){
- urls[i] = blobCache.getURL(blobKeys.get(i));
+ files[i] = blobCache.getFile(blobKeys.get(i));
}
// Verify the result
- assertEquals(blobKeys.size(), urls.length);
+ assertEquals(blobKeys.size(), files.length);
- for (final URL url : urls) {
+ for (final File file : files) {
+ assertNotNull(file);
- assertNotNull(url);
-
- try {
- final File cachedFile = new File(url.toURI());
-
- assertTrue(cachedFile.exists());
- assertEquals(buf.length, cachedFile.length());
-
- } catch (URISyntaxException e) {
- fail(e.getMessage());
- }
+ assertTrue(file.exists());
+ assertEquals(buf.length, file.length());
}
} finally {
if (blobServer != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 5db9568..ce4574b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -109,7 +109,7 @@ public class BlobServerDeleteTest extends TestLogger {
// delete a file directly on the server
server.delete(key2);
try {
- server.getURL(key2);
+ server.getFile(key2);
fail("BLOB should have been deleted");
}
catch (IOException e) {
@@ -209,7 +209,7 @@ public class BlobServerDeleteTest extends TestLogger {
server.delete(key);
// the file should still be there
- server.getURL(key);
+ server.getFile(key);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 606d8c9..476fdcb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -108,13 +108,13 @@ public class BlobLibraryCacheManagerTest {
assertEquals(0, checkFilesExist(keys, server, false));
try {
- server.getURL(keys.get(0));
+ server.getFile(keys.get(0));
fail("name-addressable BLOB should have been deleted");
} catch (IOException e) {
// expected
}
try {
- server.getURL(keys.get(1));
+ server.getFile(keys.get(1));
fail("name-addressable BLOB should have been deleted");
} catch (IOException e) {
// expected
@@ -150,7 +150,7 @@ public class BlobLibraryCacheManagerTest {
* @param doThrow
* whether exceptions should be ignored (<tt>false</tt>), or throws (<tt>true</tt>)
*
- * @return number of files we were able to retrieve via {@link BlobService#getURL(BlobKey)}
+ * @return number of files we were able to retrieve via {@link BlobService#getFile(BlobKey)}
*/
private static int checkFilesExist(
List<BlobKey> keys, BlobService blobService, boolean doThrow)
@@ -159,7 +159,7 @@ public class BlobLibraryCacheManagerTest {
for (BlobKey key : keys) {
try {
- blobService.getURL(key);
+ blobService.getFile(key);
++numFiles;
} catch (IOException e) {
if (doThrow) {
http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index e5efd19..b19835b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -107,7 +107,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
libServer[0].registerTask(jobId, executionId, keys, Collections.<URL>emptyList());
// Verify key 1
- File f = new File(cache.getURL(keys.get(0)).toURI());
+ File f = cache.getFile(keys.get(0));
assertEquals(expected.length, f.length());
try (FileInputStream fis = new FileInputStream(f)) {
@@ -126,7 +126,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);
// Verify key 1
- f = new File(cache.getURL(keys.get(0)).toURI());
+ f = cache.getFile(keys.get(0));
assertEquals(expected.length, f.length());
try (FileInputStream fis = new FileInputStream(f)) {
@@ -138,7 +138,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
}
// Verify key 2
- f = new File(cache.getURL(keys.get(1)).toURI());
+ f = cache.getFile(keys.get(1));
assertEquals(256, f.length());
try (FileInputStream fis = new FileInputStream(f)) {
[2/3] flink git commit: [FLINK-7056][blob] add API to allow
job-related BLOBs to be stored
Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index d06f76f..1216be2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -18,10 +18,12 @@
package org.apache.flink.runtime.blob;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
import org.junit.Rule;
import org.junit.Test;
@@ -40,69 +42,123 @@ import static org.junit.Assert.assertTrue;
/**
* This class contains unit tests for the {@link BlobCache}.
*/
-public class BlobCacheSuccessTest {
+public class BlobCacheSuccessTest extends TestLogger {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
/**
- * BlobCache with no HA. BLOBs need to be downloaded form a working
+ * BlobCache with no HA, job-unrelated BLOBs. BLOBs need to be downloaded form a working
* BlobServer.
*/
@Test
- public void testBlobCache() throws IOException {
+ public void testBlobNoJobCache() throws IOException {
Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
- uploadFileGetTest(config, false, false);
+ uploadFileGetTest(config, null, false, false);
+ }
+
+ /**
+ * BlobCache with no HA, job-related BLOBS. BLOBs need to be downloaded form a working
+ * BlobServer.
+ */
+ @Test
+ public void testBlobForJobCache() throws IOException {
+ Configuration config = new Configuration();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+ temporaryFolder.newFolder().getAbsolutePath());
+
+ uploadFileGetTest(config, new JobID(), false, false);
}
/**
* BlobCache is configured in HA mode and the cache can download files from
* the file system directly and does not need to download BLOBs from the
- * BlobServer.
+ * BlobServer. Using job-unrelated BLOBs.
+ */
+ @Test
+ public void testBlobNoJobCacheHa() throws IOException {
+ testBlobCacheHa(null);
+ }
+
+ /**
+ * BlobCache is configured in HA mode and the cache can download files from
+ * the file system directly and does not need to download BLOBs from the
+ * BlobServer. Using job-related BLOBs.
*/
@Test
- public void testBlobCacheHa() throws IOException {
+ public void testBlobForJobCacheHa() throws IOException {
+ testBlobCacheHa(new JobID());
+ }
+
+ private void testBlobCacheHa(final JobID jobId) throws IOException {
Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.newFolder().getPath());
- uploadFileGetTest(config, true, true);
+ uploadFileGetTest(config, jobId, true, true);
}
/**
* BlobCache is configured in HA mode and the cache can download files from
* the file system directly and does not need to download BLOBs from the
- * BlobServer.
+ * BlobServer. Using job-unrelated BLOBs.
+ */
+ @Test
+ public void testBlobNoJobCacheHa2() throws IOException {
+ testBlobCacheHa2(null);
+ }
+
+ /**
+ * BlobCache is configured in HA mode and the cache can download files from
+ * the file system directly and does not need to download BLOBs from the
+ * BlobServer. Using job-related BLOBs.
*/
@Test
- public void testBlobCacheHa2() throws IOException {
+ public void testBlobForJobCacheHa2() throws IOException {
+ testBlobCacheHa2(new JobID());
+ }
+
+ private void testBlobCacheHa2(JobID jobId) throws IOException {
Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.newFolder().getPath());
- uploadFileGetTest(config, false, true);
+ uploadFileGetTest(config, jobId, false, true);
}
/**
* BlobCache is configured in HA mode but the cache itself cannot access the
- * file system and thus needs to download BLOBs from the BlobServer.
+ * file system and thus needs to download BLOBs from the BlobServer. Using job-unrelated BLOBs.
*/
@Test
- public void testBlobCacheHaFallback() throws IOException {
+ public void testBlobNoJobCacheHaFallback() throws IOException {
+ testBlobCacheHaFallback(null);
+ }
+
+ /**
+ * BlobCache is configured in HA mode but the cache itself cannot access the
+ * file system and thus needs to download BLOBs from the BlobServer. Using job-related BLOBs.
+ */
+ @Test
+ public void testBlobForJobCacheHaFallback() throws IOException {
+ testBlobCacheHaFallback(new JobID());
+ }
+
+ private void testBlobCacheHaFallback(final JobID jobId) throws IOException {
Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.newFolder().getPath());
- uploadFileGetTest(config, false, false);
+ uploadFileGetTest(config, jobId, false, false);
}
/**
@@ -119,7 +175,7 @@ public class BlobCacheSuccessTest {
* whether the cache should have access to a shared <tt>HA_STORAGE_PATH</tt> (only useful with
* HA mode)
*/
- private void uploadFileGetTest(final Configuration config, boolean shutdownServerAfterUpload,
+ private void uploadFileGetTest(final Configuration config, JobID jobId, boolean shutdownServerAfterUpload,
boolean cacheHasAccessToFs) throws IOException {
Preconditions.checkArgument(!shutdownServerAfterUpload || cacheHasAccessToFs);
@@ -154,9 +210,9 @@ public class BlobCacheSuccessTest {
blobClient = new BlobClient(serverAddress, config);
- blobKeys.add(blobClient.put(buf));
+ blobKeys.add(blobClient.put(jobId, buf));
buf[0] = 1; // Make sure the BLOB key changes
- blobKeys.add(blobClient.put(buf));
+ blobKeys.add(blobClient.put(jobId, buf));
} finally {
if (blobClient != null) {
blobClient.close();
@@ -172,7 +228,11 @@ public class BlobCacheSuccessTest {
blobCache = new BlobCache(serverAddress, cacheConfig, blobStoreService);
for (BlobKey blobKey : blobKeys) {
- blobCache.getFile(blobKey);
+ if (jobId == null) {
+ blobCache.getFile(blobKey);
+ } else {
+ blobCache.getFile(jobId, blobKey);
+ }
}
if (blobServer != null) {
@@ -184,7 +244,11 @@ public class BlobCacheSuccessTest {
final File[] files = new File[blobKeys.size()];
for(int i = 0; i < blobKeys.size(); i++){
- files[i] = blobCache.getFile(blobKeys.get(i));
+ if (jobId == null) {
+ files[i] = blobCache.getFile(blobKeys.get(i));
+ } else {
+ files[i] = blobCache.getFile(jobId, blobKeys.get(i));
+ }
}
// Verify the result
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 2932f41..cfec4c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -38,6 +38,7 @@ import java.security.MessageDigest;
import java.util.Collections;
import java.util.List;
+import org.apache.flink.api.common.JobID;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -144,7 +145,7 @@ public class BlobClientTest {
* @throws IOException
* thrown if an I/O error occurs while reading the input stream
*/
- private static void validateGet(final InputStream inputStream, final byte[] buf) throws IOException {
+ static void validateGet(final InputStream inputStream, final byte[] buf) throws IOException {
byte[] receivedBuffer = new byte[buf.length];
int bytesReceived = 0;
@@ -220,13 +221,20 @@ public class BlobClientTest {
InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
client = new BlobClient(serverAddress, getBlobClientConfig());
+ JobID jobId = new JobID();
+
// Store the data
- BlobKey receivedKey = client.put(testBuffer);
+ BlobKey receivedKey = client.put(null, testBuffer);
+ assertEquals(origKey, receivedKey);
+ // try again with a job-related BLOB:
+ receivedKey = client.put(jobId, testBuffer);
assertEquals(origKey, receivedKey);
// Retrieve the data
InputStream is = client.get(receivedKey);
validateGet(is, testBuffer);
+ is = client.get(jobId, receivedKey);
+ validateGet(is, testBuffer);
// Check reaction to invalid keys
try {
@@ -236,6 +244,15 @@ public class BlobClientTest {
catch (IOException fnfe) {
// expected
}
+ // new client needed (closed from failure above)
+ client = new BlobClient(serverAddress, getBlobClientConfig());
+ try {
+ client.get(jobId, new BlobKey());
+ fail("Expected IOException did not occur");
+ }
+ catch (IOException fnfe) {
+ // expected
+ }
}
catch (Exception e) {
e.printStackTrace();
@@ -276,10 +293,16 @@ public class BlobClientTest {
InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
client = new BlobClient(serverAddress, getBlobClientConfig());
+ JobID jobId = new JobID();
+
// Store the data
is = new FileInputStream(testFile);
BlobKey receivedKey = client.put(is);
assertEquals(origKey, receivedKey);
+ // try again with a job-related BLOB:
+ is = new FileInputStream(testFile);
+ receivedKey = client.put(jobId, is);
+ assertEquals(origKey, receivedKey);
is.close();
is = null;
@@ -287,6 +310,8 @@ public class BlobClientTest {
// Retrieve the data
is = client.get(receivedKey);
validateGet(is, testFile);
+ is = client.get(jobId, receivedKey);
+ validateGet(is, testFile);
}
catch (Exception e) {
e.printStackTrace();
@@ -324,6 +349,13 @@ public class BlobClientTest {
InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort());
+ uploadJarFile(serverAddress, blobClientConfig, testFile);
+ uploadJarFile(serverAddress, blobClientConfig, testFile);
+ }
+
+ private static void uploadJarFile(
+ final InetSocketAddress serverAddress, final Configuration blobClientConfig,
+ final File testFile) throws IOException {
List<BlobKey> blobKeys = BlobClient.uploadJarFiles(serverAddress, blobClientConfig,
Collections.singletonList(new Path(testFile.toURI())));
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index 3c7711d..81304f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -94,11 +94,17 @@ public class BlobRecoveryITCase extends TestLogger {
BlobKey[] keys = new BlobKey[2];
- // Put data
- keys[0] = client.put(expected); // Request 1
- keys[1] = client.put(expected, 32, 256); // Request 2
+ // Put job-unrelated data
+ keys[0] = client.put(null, expected); // Request 1
+ keys[1] = client.put(null, expected, 32, 256); // Request 2
+ // Put job-related data, verify that the checksums match
JobID[] jobId = new JobID[] { new JobID(), new JobID() };
+ BlobKey key;
+ key = client.put(jobId[0], expected); // Request 3
+ assertEquals(keys[0], key);
+ key = client.put(jobId[1], expected, 32, 256); // Request 4
+ assertEquals(keys[1], key);
// check that the storage directory exists
final Path blobServerPath = new Path(storagePath, "blob");
@@ -130,9 +136,31 @@ public class BlobRecoveryITCase extends TestLogger {
}
}
+ // Verify request 3
+ try (InputStream is = client.get(jobId[0], keys[0])) {
+ byte[] actual = new byte[expected.length];
+ BlobUtils.readFully(is, actual, 0, expected.length, null);
+
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(expected[i], actual[i]);
+ }
+ }
+
+ // Verify request 4
+ try (InputStream is = client.get(jobId[1], keys[1])) {
+ byte[] actual = new byte[256];
+ BlobUtils.readFully(is, actual, 0, 256, null);
+
+ for (int i = 32, j = 0; i < 256; i++, j++) {
+ assertEquals(expected[i], actual[j]);
+ }
+ }
+
// Remove again
client.delete(keys[0]);
client.delete(keys[1]);
+ client.delete(jobId[0], keys[0]);
+ client.delete(jobId[1], keys[1]);
// Verify everything is clean
assertTrue("HA storage directory does not exist", fs.exists(new Path(storagePath)));
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index ce4574b..d91aae42 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.blob;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
@@ -39,6 +40,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
@@ -75,35 +77,45 @@ public class BlobServerDeleteTest extends TestLogger {
byte[] data = new byte[2000000];
rnd.nextBytes(data);
- // put content addressable (like libraries)
- BlobKey key = client.put(data);
- assertNotNull(key);
+ // put job-unrelated (like libraries)
+ BlobKey key1 = client.put(null, data);
+ assertNotNull(key1);
- // second item
+ // second job-unrelated item
data[0] ^= 1;
- BlobKey key2 = client.put(data);
+ BlobKey key2 = client.put(null, data);
assertNotNull(key2);
- assertNotEquals(key, key2);
+ assertNotEquals(key1, key2);
+
+ // put job-related with same key1 as non-job-related
+ data[0] ^= 1; // back to the original data
+ final JobID jobId = new JobID();
+ BlobKey key1b = client.put(jobId, data);
+ assertNotNull(key1b);
+ assertEquals(key1, key1b);
// issue a DELETE request via the client
- client.delete(key);
+ client.delete(key1);
client.close();
client = new BlobClient(serverAddress, config);
try {
- client.get(key);
+ client.get(key1);
fail("BLOB should have been deleted");
}
catch (IOException e) {
// expected
}
+ ensureClientIsClosed(client);
+
+ client = new BlobClient(serverAddress, config);
try {
- client.put(new byte[1]);
- fail("client should be closed after erroneous operation");
+ client.get(jobId, key1);
}
- catch (IllegalStateException e) {
+ catch (IOException e) {
// expected
+ fail("Deleting a job-unrelated BLOB should not affect a job-related BLOB with the same key");
}
// delete a file directly on the server
@@ -125,8 +137,29 @@ public class BlobServerDeleteTest extends TestLogger {
}
}
+ private static void ensureClientIsClosed(final BlobClient client) throws IOException {
+ try {
+ client.put(null, new byte[1]);
+ fail("client should be closed after erroneous operation");
+ }
+ catch (IllegalStateException e) {
+ // expected
+ } finally {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testDeleteAlreadyDeletedNoJob() {
+ testDeleteAlreadyDeleted(null);
+ }
+
@Test
- public void testDeleteAlreadyDeletedByBlobKey() {
+ public void testDeleteAlreadyDeletedForJob() {
+ testDeleteAlreadyDeleted(new JobID());
+ }
+
+ private void testDeleteAlreadyDeleted(final JobID jobId) {
BlobServer server = null;
BlobClient client = null;
BlobStore blobStore = new VoidBlobStore();
@@ -143,23 +176,27 @@ public class BlobServerDeleteTest extends TestLogger {
byte[] data = new byte[2000000];
rnd.nextBytes(data);
- // put content addressable (like libraries)
- BlobKey key = client.put(data);
+ // put file
+ BlobKey key = client.put(jobId, data);
assertNotNull(key);
- File blobFile = server.getStorageLocation(key);
+ File blobFile = server.getStorageLocation(jobId, key);
assertTrue(blobFile.delete());
// issue a DELETE request via the client
try {
- client.delete(key);
+ deleteHelper(client, jobId, key);
}
catch (IOException e) {
fail("DELETE operation should not fail if file is already deleted");
}
// issue a DELETE request on the server
- server.delete(key);
+ if (jobId == null) {
+ server.delete(key);
+ } else {
+ server.delete(jobId, key);
+ }
}
catch (Exception e) {
e.printStackTrace();
@@ -170,8 +207,25 @@ public class BlobServerDeleteTest extends TestLogger {
}
}
+ private static void deleteHelper(BlobClient client, JobID jobId, BlobKey key) throws IOException {
+ if (jobId == null) {
+ client.delete(key);
+ } else {
+ client.delete(jobId, key);
+ }
+ }
+
@Test
- public void testDeleteByBlobKeyFails() {
+ public void testDeleteFailsNoJob() {
+ testDeleteFails(null);
+ }
+
+ @Test
+ public void testDeleteFailsForJob() {
+ testDeleteFails(new JobID());
+ }
+
+ private void testDeleteFails(final JobID jobId) {
assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
BlobServer server = null;
@@ -193,29 +247,39 @@ public class BlobServerDeleteTest extends TestLogger {
rnd.nextBytes(data);
// put content addressable (like libraries)
- BlobKey key = client.put(data);
+ BlobKey key = client.put(jobId, data);
assertNotNull(key);
- blobFile = server.getStorageLocation(key);
+ blobFile = server.getStorageLocation(jobId, key);
directory = blobFile.getParentFile();
assertTrue(blobFile.setWritable(false, false));
assertTrue(directory.setWritable(false, false));
// issue a DELETE request via the client
- client.delete(key);
+ deleteHelper(client, jobId, key);
// issue a DELETE request on the server
- server.delete(key);
+ if (jobId == null) {
+ server.delete(key);
+ } else {
+ server.delete(jobId, key);
+ }
// the file should still be there
- server.getFile(key);
+ if (jobId == null) {
+ server.getFile(key);
+ } else {
+ server.getFile(jobId, key);
+ }
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
if (blobFile != null && directory != null) {
+ //noinspection ResultOfMethodCallIgnored
blobFile.setWritable(true, false);
+ //noinspection ResultOfMethodCallIgnored
directory.setWritable(true, false);
}
cleanup(server, client);
@@ -233,10 +297,29 @@ public class BlobServerDeleteTest extends TestLogger {
* broken.
*/
@Test
- public void testConcurrentDeleteOperations() throws IOException, ExecutionException, InterruptedException {
+ public void testConcurrentDeleteOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
+ testConcurrentDeleteOperations(null);
+ }
+
+ /**
+ * FLINK-6020
+ *
+ * Tests that concurrent delete operations don't interfere with each other.
+ *
+ * Note: The test checks that there cannot be two threads which have checked whether a given blob file exist
+ * and then one of them fails deleting it. Without the introduced lock, this situation should rarely happen
+ * and make this test fail. Thus, if this test should become "unstable", then the delete atomicity is most likely
+ * broken.
+ */
+ @Test
+ public void testConcurrentDeleteOperationsForJob() throws IOException, ExecutionException, InterruptedException {
+ testConcurrentDeleteOperations(new JobID());
+ }
+
+ private void testConcurrentDeleteOperations(final JobID jobId)
+ throws IOException, InterruptedException, ExecutionException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
-
final BlobStore blobStore = mock(BlobStore.class);
final int concurrentDeleteOperations = 3;
@@ -251,16 +334,16 @@ public class BlobServerDeleteTest extends TestLogger {
final BlobKey blobKey;
try (BlobClient client = blobServer.createClient()) {
- blobKey = client.put(data);
+ blobKey = client.put(jobId, data);
}
- assertTrue(blobServer.getStorageLocation(blobKey).exists());
+ assertTrue(blobServer.getStorageLocation(jobId, blobKey).exists());
for (int i = 0; i < concurrentDeleteOperations; i++) {
CompletableFuture<Void> deleteFuture = CompletableFuture.supplyAsync(
() -> {
try (BlobClient blobClient = blobServer.createClient()) {
- blobClient.delete(blobKey);
+ deleteHelper(blobClient, jobId, blobKey);
} catch (IOException e) {
throw new FlinkFutureException("Could not delete the given blob key " + blobKey + '.', e);
}
@@ -278,13 +361,13 @@ public class BlobServerDeleteTest extends TestLogger {
// in case of no lock, one of the delete operations should eventually fail
waitFuture.get();
- assertFalse(blobServer.getStorageLocation(blobKey).exists());
+ assertFalse(blobServer.getStorageLocation(jobId, blobKey).exists());
} finally {
executor.shutdownNow();
}
}
- private void cleanup(BlobServer server, BlobClient client) {
+ private static void cleanup(BlobServer server, BlobClient client) {
if (client != null) {
try {
client.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index bd27d70..5ad8d95 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.blob;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
@@ -69,7 +70,28 @@ public class BlobServerGetTest extends TestLogger {
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Test
- public void testGetFailsDuringLookup() throws IOException {
+ public void testGetFailsDuringLookup1() throws IOException {
+ testGetFailsDuringLookup(null, new JobID());
+ }
+
+ @Test
+ public void testGetFailsDuringLookup2() throws IOException {
+ testGetFailsDuringLookup(new JobID(), new JobID());
+ }
+
+ @Test
+ public void testGetFailsDuringLookup3() throws IOException {
+ testGetFailsDuringLookup(new JobID(), null);
+ }
+
+ /**
+ * Checks the correct result if a GET operation fails during the lookup of the file.
+ *
+ * @param jobId1 first job ID or <tt>null</tt> if job-unrelated
+ * @param jobId2 second job ID different to <tt>jobId1</tt>
+ */
+ private void testGetFailsDuringLookup(final JobID jobId1, final JobID jobId2)
+ throws IOException {
BlobServer server = null;
BlobClient client = null;
@@ -86,20 +108,29 @@ public class BlobServerGetTest extends TestLogger {
rnd.nextBytes(data);
// put content addressable (like libraries)
- BlobKey key = client.put(data);
+ BlobKey key = client.put(jobId1, data);
assertNotNull(key);
- // delete all files to make sure that GET requests fail
- File blobFile = server.getStorageLocation(key);
+ // delete file to make sure that GET requests fail
+ File blobFile = server.getStorageLocation(jobId1, key);
assertTrue(blobFile.delete());
// issue a GET request that fails
- try {
- client.get(key);
- fail("This should not succeed.");
- } catch (IOException e) {
- // expected
- }
+ client = verifyDeleted(client, jobId1, key, serverAddress, config);
+
+ BlobKey key2 = client.put(jobId2, data);
+ assertNotNull(key);
+ assertEquals(key, key2);
+ // request for jobId2 should succeed
+ getFileHelper(client, jobId2, key);
+ // request for jobId1 should still fail
+ client = verifyDeleted(client, jobId1, key, serverAddress, config);
+
+ // same checks as for jobId1 but for jobId2 should also work:
+ blobFile = server.getStorageLocation(jobId2, key);
+ assertTrue(blobFile.delete());
+ client = verifyDeleted(client, jobId2, key, serverAddress, config);
+
} finally {
if (client != null) {
client.close();
@@ -110,8 +141,51 @@ public class BlobServerGetTest extends TestLogger {
}
}
+ /**
+ * Checks that the given blob does not exist anymore.
+ *
+ * @param client
+ * BLOB client to use for connecting to the BLOB server
+ * @param jobId
+ * job ID or <tt>null</tt> if job-unrelated
+ * @param key
+ * key identifying the BLOB to request
+ * @param serverAddress
+ * BLOB server address
+ * @param config
+ * client config
+ *
+ * @return a new client (since the old one is being closed on failure)
+ */
+ private static BlobClient verifyDeleted(
+ BlobClient client, JobID jobId, BlobKey key,
+ InetSocketAddress serverAddress, Configuration config) throws IOException {
+ try {
+ getFileHelper(client, jobId, key);
+ fail("This should not succeed.");
+ } catch (IOException e) {
+ // expected
+ }
+ // need a new client (old ony closed due to failure
+ return new BlobClient(serverAddress, config);
+ }
+
+ @Test
+ public void testGetFailsDuringStreamingNoJob() throws IOException {
+ testGetFailsDuringStreaming(null);
+ }
+
@Test
- public void testGetFailsDuringStreaming() throws IOException {
+ public void testGetFailsDuringStreamingForJob() throws IOException {
+ testGetFailsDuringStreaming(new JobID());
+ }
+
+ /**
+ * Checks the correct result if a GET operation fails during the file download.
+ *
+ * @param jobId job ID or <tt>null</tt> if job-unrelated
+ */
+ private void testGetFailsDuringStreaming(final JobID jobId) throws IOException {
BlobServer server = null;
BlobClient client = null;
@@ -128,11 +202,11 @@ public class BlobServerGetTest extends TestLogger {
rnd.nextBytes(data);
// put content addressable (like libraries)
- BlobKey key = client.put(data);
+ BlobKey key = client.put(jobId, data);
assertNotNull(key);
// issue a GET request that succeeds
- InputStream is = client.get(key);
+ InputStream is = getFileHelper(client, jobId, key);
byte[] receiveBuffer = new byte[data.length];
int firstChunkLen = 50000;
@@ -169,8 +243,22 @@ public class BlobServerGetTest extends TestLogger {
* Tests that concurrent get operations don't concurrently access the BlobStore to download a blob.
*/
@Test
- public void testConcurrentGetOperations() throws IOException, ExecutionException, InterruptedException {
+ public void testConcurrentGetOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
+ testConcurrentGetOperations(null);
+ }
+ /**
+ * FLINK-6020
+ *
+ * Tests that concurrent get operations don't concurrently access the BlobStore to download a blob.
+ */
+ @Test
+ public void testConcurrentGetOperationsForJob() throws IOException, ExecutionException, InterruptedException {
+ testConcurrentGetOperations(new JobID());
+ }
+
+ private void testConcurrentGetOperations(final JobID jobId)
+ throws IOException, InterruptedException, ExecutionException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -191,14 +279,14 @@ public class BlobServerGetTest extends TestLogger {
new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
- File targetFile = (File) invocation.getArguments()[1];
+ File targetFile = (File) invocation.getArguments()[2];
FileUtils.copyInputStreamToFile(bais, targetFile);
return null;
}
}
- ).when(blobStore).get(any(BlobKey.class), any(File.class));
+ ).when(blobStore).get(any(JobID.class), any(BlobKey.class), any(File.class));
final ExecutorService executor = Executors.newFixedThreadPool(numberConcurrentGetOperations);
@@ -207,7 +295,7 @@ public class BlobServerGetTest extends TestLogger {
CompletableFuture<InputStream> getOperation = CompletableFuture.supplyAsync(
() -> {
try (BlobClient blobClient = blobServer.createClient();
- InputStream inputStream = blobClient.get(blobKey)) {
+ InputStream inputStream = getFileHelper(blobClient, jobId, blobKey)) {
byte[] buffer = new byte[data.length];
IOUtils.readFully(inputStream, buffer);
@@ -241,9 +329,18 @@ public class BlobServerGetTest extends TestLogger {
}
// verify that we downloaded the requested blob exactly once from the BlobStore
- verify(blobStore, times(1)).get(eq(blobKey), any(File.class));
+ verify(blobStore, times(1)).get(eq(jobId), eq(blobKey), any(File.class));
} finally {
executor.shutdownNow();
}
}
+
+ static InputStream getFileHelper(BlobClient blobClient, JobID jobId, BlobKey blobKey)
+ throws IOException {
+ if (jobId == null) {
+ return blobClient.get(blobKey);
+ } else {
+ return blobClient.get(jobId, blobKey);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index c479167..f55adb7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.blob;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
@@ -45,6 +46,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import static org.apache.flink.runtime.blob.BlobServerGetTest.getFileHelper;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -71,28 +73,43 @@ public class BlobServerPutTest extends TestLogger {
// --- concurrency tests for utility methods which could fail during the put operation ---
/**
- * Checked thread that calls {@link BlobServer#getStorageLocation(BlobKey)}
+ * Checked thread that calls {@link BlobServer#getStorageLocation(JobID, BlobKey)}.
*/
public static class ContentAddressableGetStorageLocation extends CheckedThread {
private final BlobServer server;
+ private final JobID jobId;
private final BlobKey key;
- public ContentAddressableGetStorageLocation(BlobServer server, BlobKey key) {
+ public ContentAddressableGetStorageLocation(BlobServer server, JobID jobId, BlobKey key) {
this.server = server;
+ this.jobId = jobId;
this.key = key;
}
@Override
public void go() throws Exception {
- server.getStorageLocation(key);
+ server.getStorageLocation(jobId, key);
}
}
/**
- * Tests concurrent calls to {@link BlobServer#getStorageLocation(BlobKey)}.
+ * Tests concurrent calls to {@link BlobServer#getStorageLocation(JobID, BlobKey)}.
*/
@Test
- public void testServerContentAddressableGetStorageLocationConcurrent() throws Exception {
+ public void testServerContentAddressableGetStorageLocationConcurrentNoJob() throws Exception {
+ testServerContentAddressableGetStorageLocationConcurrent(null);
+ }
+
+ /**
+ * Tests concurrent calls to {@link BlobServer#getStorageLocation(JobID, BlobKey)}.
+ */
+ @Test
+ public void testServerContentAddressableGetStorageLocationConcurrentForJob() throws Exception {
+ testServerContentAddressableGetStorageLocationConcurrent(new JobID());
+ }
+
+ private void testServerContentAddressableGetStorageLocationConcurrent(final JobID jobId)
+ throws Exception {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -101,9 +118,9 @@ public class BlobServerPutTest extends TestLogger {
try {
BlobKey key = new BlobKey();
CheckedThread[] threads = new CheckedThread[] {
- new ContentAddressableGetStorageLocation(server, key),
- new ContentAddressableGetStorageLocation(server, key),
- new ContentAddressableGetStorageLocation(server, key)
+ new ContentAddressableGetStorageLocation(server, jobId, key),
+ new ContentAddressableGetStorageLocation(server, jobId, key),
+ new ContentAddressableGetStorageLocation(server, jobId, key)
};
checkedThreadSimpleTest(threads);
} finally {
@@ -134,7 +151,27 @@ public class BlobServerPutTest extends TestLogger {
// --------------------------------------------------------------------------------------------
@Test
- public void testPutBufferSuccessful() throws IOException {
+ public void testPutBufferSuccessfulGet1() throws IOException {
+ testPutBufferSuccessfulGet(null, null);
+ }
+
+ @Test
+ public void testPutBufferSuccessfulGet2() throws IOException {
+ testPutBufferSuccessfulGet(null, new JobID());
+ }
+
+ @Test
+ public void testPutBufferSuccessfulGet3() throws IOException {
+ testPutBufferSuccessfulGet(new JobID(), new JobID());
+ }
+
+ @Test
+ public void testPutBufferSuccessfulGet4() throws IOException {
+ testPutBufferSuccessfulGet(new JobID(), null);
+ }
+
+ private void testPutBufferSuccessfulGet(final JobID jobId1, final JobID jobId2)
+ throws IOException {
BlobServer server = null;
BlobClient client = null;
@@ -150,17 +187,66 @@ public class BlobServerPutTest extends TestLogger {
byte[] data = new byte[2000000];
rnd.nextBytes(data);
- // put content addressable (like libraries)
- BlobKey key1 = client.put(data);
- assertNotNull(key1);
+ // put data for jobId1 and verify
+ BlobKey key1a = client.put(jobId1, data);
+ assertNotNull(key1a);
+
+ BlobKey key1b = client.put(jobId1, data, 10, 44);
+ assertNotNull(key1b);
+
+ testPutBufferSuccessfulGet(jobId1, key1a, key1b, data, serverAddress, config);
+
+ // now put data for jobId2 and verify that both are ok
+ BlobKey key2a = client.put(jobId2, data);
+ assertNotNull(key2a);
+ assertEquals(key1a, key2a);
+
+ BlobKey key2b = client.put(jobId2, data, 10, 44);
+ assertNotNull(key2b);
+ assertEquals(key1b, key2b);
+
+
+ testPutBufferSuccessfulGet(jobId1, key1a, key1b, data, serverAddress, config);
+ testPutBufferSuccessfulGet(jobId2, key2a, key2b, data, serverAddress, config);
+
+
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ if (server != null) {
+ server.close();
+ }
+ }
+ }
- BlobKey key2 = client.put(data, 10, 44);
- assertNotNull(key2);
+ /**
+ * GET the data stored at the two keys and check that it is equal to <tt>data</tt>.
+ *
+ * @param jobId
+ * job ID or <tt>null</tt> if job-unrelated
+ * @param key1
+ * first key
+ * @param key2
+ * second key
+ * @param data
+ * expected data
+ * @param serverAddress
+ * BlobServer address to connect to
+ * @param config
+ * client configuration
+ */
+ private static void testPutBufferSuccessfulGet(
+ JobID jobId, BlobKey key1, BlobKey key2, byte[] data,
+ InetSocketAddress serverAddress, Configuration config) throws IOException {
- // --- GET the data and check that it is equal ---
+ BlobClient client = new BlobClient(serverAddress, config);
+ InputStream is1 = null;
+ InputStream is2 = null;
+ try {
// one get request on the same client
- InputStream is1 = client.get(key2);
+ is1 = getFileHelper(client, jobId, key2);
byte[] result1 = new byte[44];
BlobUtils.readFully(is1, result1, 0, result1.length, null);
is1.close();
@@ -173,24 +259,31 @@ public class BlobServerPutTest extends TestLogger {
client.close();
client = new BlobClient(serverAddress, config);
- InputStream is2 = client.get(key1);
- byte[] result2 = new byte[data.length];
- BlobUtils.readFully(is2, result2, 0, result2.length, null);
+ is2 = getFileHelper(client, jobId, key1);
+ BlobClientTest.validateGet(is2, data);
is2.close();
- assertArrayEquals(data, result2);
} finally {
- if (client != null) {
- client.close();
+ if (is1 != null) {
+ is1.close();
}
- if (server != null) {
- server.close();
+ if (is2 != null) {
+ is1.close();
}
+ client.close();
}
}
+ @Test
+ public void testPutStreamSuccessfulNoJob() throws IOException {
+ testPutStreamSuccessful(null);
+ }
@Test
- public void testPutStreamSuccessful() throws IOException {
+ public void testPutStreamSuccessfulForJob() throws IOException {
+ testPutStreamSuccessful(new JobID());
+ }
+
+ private void testPutStreamSuccessful(final JobID jobId) throws IOException {
BlobServer server = null;
BlobClient client = null;
@@ -208,7 +301,12 @@ public class BlobServerPutTest extends TestLogger {
// put content addressable (like libraries)
{
- BlobKey key1 = client.put(new ByteArrayInputStream(data));
+ BlobKey key1;
+ if (jobId == null) {
+ key1 = client.put(new ByteArrayInputStream(data));
+ } else {
+ key1 = client.put(jobId, new ByteArrayInputStream(data));
+ }
assertNotNull(key1);
}
} finally {
@@ -226,7 +324,16 @@ public class BlobServerPutTest extends TestLogger {
}
@Test
- public void testPutChunkedStreamSuccessful() throws IOException {
+ public void testPutChunkedStreamSuccessfulNoJob() throws IOException {
+ testPutChunkedStreamSuccessful(null);
+ }
+
+ @Test
+ public void testPutChunkedStreamSuccessfulForJob() throws IOException {
+ testPutChunkedStreamSuccessful(new JobID());
+ }
+
+ private void testPutChunkedStreamSuccessful(final JobID jobId) throws IOException {
BlobServer server = null;
BlobClient client = null;
@@ -244,7 +351,12 @@ public class BlobServerPutTest extends TestLogger {
// put content addressable (like libraries)
{
- BlobKey key1 = client.put(new ChunkedInputStream(data, 19));
+ BlobKey key1;
+ if (jobId == null) {
+ key1 = client.put(new ChunkedInputStream(data, 19));
+ } else {
+ key1 = client.put(jobId, new ChunkedInputStream(data, 19));
+ }
assertNotNull(key1);
}
} finally {
@@ -258,7 +370,16 @@ public class BlobServerPutTest extends TestLogger {
}
@Test
- public void testPutBufferFails() throws IOException {
+ public void testPutBufferFailsNoJob() throws IOException {
+ testPutBufferFails(null);
+ }
+
+ @Test
+ public void testPutBufferFailsForJob() throws IOException {
+ testPutBufferFails(new JobID());
+ }
+
+ private void testPutBufferFails(final JobID jobId) throws IOException {
assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
BlobServer server = null;
@@ -285,7 +406,7 @@ public class BlobServerPutTest extends TestLogger {
// put content addressable (like libraries)
try {
- client.put(data);
+ client.put(jobId, data);
fail("This should fail.");
}
catch (IOException e) {
@@ -293,7 +414,7 @@ public class BlobServerPutTest extends TestLogger {
}
try {
- client.put(data);
+ client.put(jobId, data);
fail("Client should be closed");
}
catch (IllegalStateException e) {
@@ -320,7 +441,22 @@ public class BlobServerPutTest extends TestLogger {
* Tests that concurrent put operations will only upload the file once to the {@link BlobStore}.
*/
@Test
- public void testConcurrentPutOperations() throws IOException, ExecutionException, InterruptedException {
+ public void testConcurrentPutOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
+ testConcurrentPutOperations(null);
+ }
+
+ /**
+ * FLINK-6020
+ *
+ * Tests that concurrent put operations will only upload the file once to the {@link BlobStore}.
+ */
+ @Test
+ public void testConcurrentPutOperationsForJob() throws IOException, ExecutionException, InterruptedException {
+ testConcurrentPutOperations(new JobID());
+ }
+
+ private void testConcurrentPutOperations(final JobID jobId)
+ throws IOException, InterruptedException, ExecutionException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -331,7 +467,7 @@ public class BlobServerPutTest extends TestLogger {
final CountDownLatch countDownLatch = new CountDownLatch(concurrentPutOperations);
final byte[] data = new byte[dataSize];
- ArrayList<CompletableFuture<BlobKey>> allFutures = new ArrayList(concurrentPutOperations);
+ ArrayList<CompletableFuture<BlobKey>> allFutures = new ArrayList<>(concurrentPutOperations);
ExecutorService executor = Executors.newFixedThreadPool(concurrentPutOperations);
@@ -342,7 +478,13 @@ public class BlobServerPutTest extends TestLogger {
CompletableFuture<BlobKey> putFuture = CompletableFuture.supplyAsync(
() -> {
try (BlobClient blobClient = blobServer.createClient()) {
- return blobClient.put(new BlockingInputStream(countDownLatch, data));
+ if (jobId == null) {
+ return blobClient
+ .put(new BlockingInputStream(countDownLatch, data));
+ } else {
+ return blobClient
+ .put(jobId, new BlockingInputStream(countDownLatch, data));
+ }
} catch (IOException e) {
throw new FlinkFutureException("Could not upload blob.", e);
}
@@ -369,7 +511,7 @@ public class BlobServerPutTest extends TestLogger {
}
// check that we only uploaded the file once to the blob store
- verify(blobStore, times(1)).put(any(File.class), eq(blobKey));
+ verify(blobStore, times(1)).put(any(File.class), eq(jobId), eq(blobKey));
} finally {
executor.shutdownNow();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
index 2987c39..e449aab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
@@ -18,11 +18,9 @@
package org.apache.flink.runtime.blob;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeTrue;
-import static org.mockito.Mockito.mock;
-
+import org.apache.flink.api.common.JobID;
import org.apache.flink.util.OperatingSystem;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -32,6 +30,10 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.mock;
+
public class BlobUtilsTest {
private final static String CANNOT_CREATE_THIS = "cannot-create-this";
@@ -62,12 +64,18 @@ public class BlobUtilsTest {
public void testExceptionOnCreateStorageDirectoryFailure() throws
IOException {
// Should throw an Exception
- BlobUtils.initStorageDirectory(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS).getAbsolutePath());
+ BlobUtils.initLocalStorageDirectory(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS).getAbsolutePath());
+ }
+
+ @Test(expected = Exception.class)
+ public void testExceptionOnCreateCacheDirectoryFailureNoJob() {
+ // Should throw an Exception
+ BlobUtils.getStorageLocation(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS), null, mock(BlobKey.class));
}
@Test(expected = Exception.class)
- public void testExceptionOnCreateCacheDirectoryFailure() {
+ public void testExceptionOnCreateCacheDirectoryFailureForJob() {
// Should throw an Exception
- BlobUtils.getStorageLocation(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS), mock(BlobKey.class));
+ BlobUtils.getStorageLocation(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS), new JobID(), mock(BlobKey.class));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 476fdcb..b43a307 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -73,17 +73,20 @@ public class BlobLibraryCacheManagerTest {
InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort());
BlobClient bc = new BlobClient(blobSocketAddress, config);
- keys.add(bc.put(buf));
+ // TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
+ JobID jobId = null;
+
+ keys.add(bc.put(jobId, buf));
buf[0] += 1;
- keys.add(bc.put(buf));
+ keys.add(bc.put(jobId, buf));
bc.close();
- long cleanupInterval = 1000l;
+ long cleanupInterval = 1000L;
libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval);
libraryCacheManager.registerJob(jid, keys, Collections.<URL>emptyList());
- assertEquals(2, checkFilesExist(keys, server, true));
+ assertEquals(2, checkFilesExist(jobId, keys, server, true));
assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries());
assertEquals(1, libraryCacheManager.getNumberOfReferenceHolders(jid));
@@ -105,17 +108,25 @@ public class BlobLibraryCacheManagerTest {
assertEquals(0, libraryCacheManager.getNumberOfReferenceHolders(jid));
// the blob cache should no longer contain the files
- assertEquals(0, checkFilesExist(keys, server, false));
+ assertEquals(0, checkFilesExist(jobId, keys, server, false));
try {
- server.getFile(keys.get(0));
- fail("name-addressable BLOB should have been deleted");
+ if (jobId == null) {
+ server.getFile(keys.get(0));
+ } else {
+ server.getFile(jobId, keys.get(0));
+ }
+ fail("BLOB should have been deleted");
} catch (IOException e) {
// expected
}
try {
- server.getFile(keys.get(1));
- fail("name-addressable BLOB should have been deleted");
+ if (jobId == null) {
+ server.getFile(keys.get(1));
+ } else {
+ server.getFile(jobId, keys.get(1));
+ }
+ fail("BLOB should have been deleted");
} catch (IOException e) {
// expected
}
@@ -150,16 +161,20 @@ public class BlobLibraryCacheManagerTest {
* @param doThrow
* whether exceptions should be ignored (<tt>false</tt>), or throws (<tt>true</tt>)
*
- * @return number of files we were able to retrieve via {@link BlobService#getFile(BlobKey)}
+ * @return number of files we were able to retrieve via {@link BlobService#getFile}
*/
private static int checkFilesExist(
- List<BlobKey> keys, BlobService blobService, boolean doThrow)
+ JobID jobId, List<BlobKey> keys, BlobService blobService, boolean doThrow)
throws IOException {
int numFiles = 0;
for (BlobKey key : keys) {
try {
- blobService.getFile(key);
+ if (jobId == null) {
+ blobService.getFile(key);
+ } else {
+ blobService.getFile(jobId, key);
+ }
++numFiles;
} catch (IOException e) {
if (doThrow) {
@@ -196,22 +211,26 @@ public class BlobLibraryCacheManagerTest {
InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort());
BlobClient bc = new BlobClient(blobSocketAddress, config);
- keys.add(bc.put(buf));
+ // TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
+// JobID jobId = new JobID();
+ JobID jobId = null;
+
+ keys.add(bc.put(jobId, buf));
buf[0] += 1;
- keys.add(bc.put(buf));
+ keys.add(bc.put(jobId, buf));
- long cleanupInterval = 1000l;
+ long cleanupInterval = 1000L;
libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval);
libraryCacheManager.registerTask(jid, executionId1, keys, Collections.<URL>emptyList());
libraryCacheManager.registerTask(jid, executionId2, keys, Collections.<URL>emptyList());
- assertEquals(2, checkFilesExist(keys, server, true));
+ assertEquals(2, checkFilesExist(jobId, keys, server, true));
assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries());
assertEquals(2, libraryCacheManager.getNumberOfReferenceHolders(jid));
libraryCacheManager.unregisterTask(jid, executionId1);
- assertEquals(2, checkFilesExist(keys, server, true));
+ assertEquals(2, checkFilesExist(jobId, keys, server, true));
assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries());
assertEquals(1, libraryCacheManager.getNumberOfReferenceHolders(jid));
@@ -233,7 +252,7 @@ public class BlobLibraryCacheManagerTest {
assertEquals(0, libraryCacheManager.getNumberOfReferenceHolders(jid));
// the blob cache should no longer contain the files
- assertEquals(0, checkFilesExist(keys, server, false));
+ assertEquals(0, checkFilesExist(jobId, keys, server, false));
bc.close();
} finally {
@@ -269,10 +288,13 @@ public class BlobLibraryCacheManagerTest {
InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
cache = new BlobCache(serverAddress, config, new VoidBlobStore());
+ // TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
+ JobID jobId = null;
+
// upload some meaningless data to the server
BlobClient uploader = new BlobClient(serverAddress, config);
- BlobKey dataKey1 = uploader.put(new byte[]{1, 2, 3, 4, 5, 6, 7, 8});
- BlobKey dataKey2 = uploader.put(new byte[]{11, 12, 13, 14, 15, 16, 17, 18});
+ BlobKey dataKey1 = uploader.put(jobId, new byte[]{1, 2, 3, 4, 5, 6, 7, 8});
+ BlobKey dataKey2 = uploader.put(jobId, new byte[]{11, 12, 13, 14, 15, 16, 17, 18});
uploader.close();
BlobLibraryCacheManager libCache = new BlobLibraryCacheManager(cache, 1000000000L);
@@ -316,11 +338,12 @@ public class BlobLibraryCacheManagerTest {
fail("Should fail with an IllegalStateException");
}
catch (IllegalStateException e) {
- // that#s what we want
+ // that's what we want
}
}
- cacheDir = new File(cache.getStorageDir(), "cache");
+ // see BlobUtils for the directory layout
+ cacheDir = new File(cache.getStorageDir(), "no_job");
assertTrue(cacheDir.exists());
// make sure no further blobs can be downloaded by removing the write
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index b19835b..2f6738d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -91,10 +91,14 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
List<BlobKey> keys = new ArrayList<>(2);
+ JobID jobId = new JobID();
+ // TODO: replace+adapt by jobId after adapting the BlobLibraryCacheManager
+ JobID blobJobId = null;
+
// Upload some data (libraries)
try (BlobClient client = new BlobClient(serverAddress[0], config)) {
- keys.add(client.put(expected)); // Request 1
- keys.add(client.put(expected, 32, 256)); // Request 2
+ keys.add(client.put(blobJobId, expected)); // Request 1
+ keys.add(client.put(blobJobId, expected, 32, 256)); // Request 2
}
// The cache
@@ -102,7 +106,6 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);
// Register uploaded libraries
- JobID jobId = new JobID();
ExecutionAttemptID executionId = new ExecutionAttemptID();
libServer[0].registerTask(jobId, executionId, keys, Collections.<URL>emptyList());
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 79b9c1c..3c75971 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.jobmanager;
import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -136,9 +137,11 @@ public class JobSubmitTest {
// upload two dummy bytes and add their keys to the job graph as dependencies
BlobKey key1, key2;
BlobClient bc = new BlobClient(new InetSocketAddress("localhost", blobPort), jmConfig);
+ // TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
+ JobID jobId = null;
try {
- key1 = bc.put(new byte[10]);
- key2 = bc.put(new byte[10]);
+ key1 = bc.put(jobId, new byte[10]);
+ key2 = bc.put(jobId, new byte[10]);
// delete one of the blobs to make sure that the startup failed
bc.delete(key2);
[3/3] flink git commit: [FLINK-7056][blob] add API to allow
job-related BLOBs to be stored
Posted by tr...@apache.org.
[FLINK-7056][blob] add API to allow job-related BLOBs to be stored
[FLINK-7056][blob] refactor the new API for job-related BLOBs
For a cleaner API, instead of having a nullable jobId parameter, use two methods:
one for job-related BLOBs, another for job-unrelated ones.
This closes #4237.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a19c456
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a19c456
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a19c456
Branch: refs/heads/master
Commit: 0a19c456ac7781d94eb0aaaf8f2ac73d0157bacb
Parents: b7c1dfa
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Jun 21 18:04:43 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Aug 14 11:06:24 2017 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/blob/BlobCache.java | 167 +++++++--
.../apache/flink/runtime/blob/BlobClient.java | 361 +++++++++++++------
.../apache/flink/runtime/blob/BlobServer.java | 132 +++++--
.../runtime/blob/BlobServerConnection.java | 228 +++++++-----
.../flink/runtime/blob/BlobServerProtocol.java | 12 +-
.../apache/flink/runtime/blob/BlobService.java | 29 +-
.../apache/flink/runtime/blob/BlobStore.java | 6 +-
.../apache/flink/runtime/blob/BlobUtils.java | 151 +++++---
.../org/apache/flink/runtime/blob/BlobView.java | 5 +-
.../flink/runtime/blob/FileSystemBlobStore.java | 14 +-
.../flink/runtime/blob/VoidBlobStore.java | 7 +-
.../apache/flink/runtime/client/JobClient.java | 1 +
.../apache/flink/runtime/jobgraph/JobGraph.java | 1 +
.../runtime/blob/BlobCacheRetriesTest.java | 103 ++++--
.../runtime/blob/BlobCacheSuccessTest.java | 100 ++++-
.../flink/runtime/blob/BlobClientTest.java | 36 +-
.../flink/runtime/blob/BlobRecoveryITCase.java | 34 +-
.../runtime/blob/BlobServerDeleteTest.java | 143 ++++++--
.../flink/runtime/blob/BlobServerGetTest.java | 133 ++++++-
.../flink/runtime/blob/BlobServerPutTest.java | 212 +++++++++--
.../flink/runtime/blob/BlobUtilsTest.java | 22 +-
.../BlobLibraryCacheManagerTest.java | 67 ++--
.../BlobLibraryCacheRecoveryITCase.java | 9 +-
.../flink/runtime/jobmanager/JobSubmitTest.java | 7 +-
24 files changed, 1503 insertions(+), 477 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 3e19537..29f7706 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -18,12 +18,15 @@
package org.apache.flink.runtime.blob;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -38,7 +41,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* The BLOB cache implements a local cache for content-addressable BLOBs.
*
- * <p>When requesting BLOBs through the {@link BlobCache#getFile(BlobKey)} method, the
+ * <p>When requesting BLOBs through the {@link BlobCache#getFile} methods, the
* BLOB cache will first attempt to serve the file from its local cache. Only if
* the local cache does not contain the desired BLOB, the BLOB cache will try to
* download it from a distributed file system (if available) or the BLOB
@@ -91,7 +94,7 @@ public final class BlobCache implements BlobService {
// configure and create the storage directory
String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
- this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
+ this.storageDir = BlobUtils.initLocalStorageDirectory(storageDirectory);
LOG.info("Created BLOB cache storage directory " + storageDir);
// configure the number of fetch retries
@@ -110,19 +113,66 @@ public final class BlobCache implements BlobService {
}
/**
- * Returns local copy of the file for the BLOB with the given key. The method will first attempt to serve
- * the BLOB from its local cache. If the BLOB is not in the cache, the method will try to download it
- * from this cache's BLOB server.
+ * Returns local copy of the (job-unrelated) file for the BLOB with the given key.
+ * <p>
+ * The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in
+ * the cache, the method will try to download it from this cache's BLOB server.
+ *
+ * @param key
+ * The key of the desired BLOB.
*
- * @param requiredBlob The key of the desired BLOB.
* @return file referring to the local storage location of the BLOB.
- * @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
+ *
+ * @throws IOException
+ * Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
*/
@Override
- public File getFile(final BlobKey requiredBlob) throws IOException {
+ public File getFile(BlobKey key) throws IOException {
+ return getFileInternal(null, key);
+ }
+
+ /**
+ * Returns local copy of the file for the BLOB with the given key.
+ * <p>
+ * The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in
+ * the cache, the method will try to download it from this cache's BLOB server.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to
+ * @param key
+ * The key of the desired BLOB.
+ *
+ * @return file referring to the local storage location of the BLOB.
+ *
+ * @throws IOException
+ * Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
+ */
+ @Override
+ public File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException {
+ checkNotNull(jobId);
+ return getFileInternal(jobId, key);
+ }
+
+ /**
+ * Returns local copy of the file for the BLOB with the given key.
+ * <p>
+ * The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in
+ * the cache, the method will try to download it from this cache's BLOB server.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
+ * @param requiredBlob
+ * The key of the desired BLOB.
+ *
+ * @return file referring to the local storage location of the BLOB.
+ *
+ * @throws IOException
+ * Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
+ */
+ private File getFileInternal(@Nullable JobID jobId, BlobKey requiredBlob) throws IOException {
checkArgument(requiredBlob != null, "BLOB key cannot be null.");
- final File localJarFile = BlobUtils.getStorageLocation(storageDir, requiredBlob);
+ final File localJarFile = BlobUtils.getStorageLocation(storageDir, jobId, requiredBlob);
if (localJarFile.exists()) {
return localJarFile;
@@ -130,7 +180,7 @@ public final class BlobCache implements BlobService {
// first try the distributed blob store (if available)
try {
- blobView.get(requiredBlob, localJarFile);
+ blobView.get(jobId, requiredBlob, localJarFile);
} catch (Exception e) {
LOG.info("Failed to copy from blob store. Downloading from BLOB server instead.", e);
}
@@ -141,14 +191,14 @@ public final class BlobCache implements BlobService {
// fallback: download from the BlobServer
final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
- LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
+ LOG.info("Downloading {}/{} from {}", jobId, requiredBlob, serverAddress);
// loop over retries
int attempt = 0;
while (true) {
try (
final BlobClient bc = new BlobClient(serverAddress, blobClientConfig);
- final InputStream is = bc.get(requiredBlob);
+ final InputStream is = bc.getInternal(jobId, requiredBlob);
final OutputStream os = new FileOutputStream(localJarFile)
) {
while (true) {
@@ -163,7 +213,7 @@ public final class BlobCache implements BlobService {
return localJarFile;
}
catch (Throwable t) {
- String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress +
+ String message = "Failed to fetch BLOB " + jobId + "/" + requiredBlob + " from " + serverAddress +
" and store it under " + localJarFile.getAbsolutePath();
if (attempt < numFetchRetries) {
if (LOG.isDebugEnabled()) {
@@ -179,41 +229,110 @@ public final class BlobCache implements BlobService {
// retry
++attempt;
- LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt);
+ LOG.info("Downloading {}/{} from {} (retry {})", jobId, requiredBlob, serverAddress, attempt);
}
} // end loop over retries
}
/**
- * Deletes the file associated with the given key from the BLOB cache.
- * @param key referring to the file to be deleted
+ * Deletes the (job-unrelated) file associated with the blob key in this BLOB cache.
+ *
+ * @param key
+ * blob key associated with the file to be deleted
+ *
+ * @throws IOException
*/
@Override
- public void delete(BlobKey key) throws IOException{
- final File localFile = BlobUtils.getStorageLocation(storageDir, key);
+ public void delete(BlobKey key) throws IOException {
+ deleteInternal(null, key);
+ }
+ /**
+ * Deletes the file associated with the blob key in this BLOB cache.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to
+ * @param key
+ * blob key associated with the file to be deleted
+ *
+ * @throws IOException
+ */
+ @Override
+ public void delete(@Nonnull JobID jobId, BlobKey key) throws IOException {
+ checkNotNull(jobId);
+ deleteInternal(jobId, key);
+ }
+
+ /**
+ * Deletes the file associated with the blob key in this BLOB cache.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
+ * @param key
+ * blob key associated with the file to be deleted
+ *
+ * @throws IOException
+ */
+ private void deleteInternal(@Nullable JobID jobId, BlobKey key) throws IOException{
+ final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, key);
if (!localFile.delete() && localFile.exists()) {
LOG.warn("Failed to delete locally cached BLOB {} at {}", key, localFile.getAbsolutePath());
}
}
/**
- * Deletes the file associated with the given key from the BLOB cache and
+ * Deletes the (job-unrelated) file associated with the given key from the BLOB cache and
* BLOB server.
*
- * @param key referring to the file to be deleted
+ * @param key
+ * referring to the file to be deleted
+ *
* @throws IOException
- * thrown if an I/O error occurs while transferring the request to
- * the BLOB server or if the BLOB server cannot delete the file
+ * thrown if an I/O error occurs while transferring the request to the BLOB server or if the
+ * BLOB server cannot delete the file
*/
public void deleteGlobal(BlobKey key) throws IOException {
+ deleteGlobalInternal(null, key);
+ }
+
+ /**
+ * Deletes the file associated with the given key from the BLOB cache and BLOB server.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to
+ * @param key
+ * referring to the file to be deleted
+ *
+ * @throws IOException
+ * thrown if an I/O error occurs while transferring the request to the BLOB server or if the
+ * BLOB server cannot delete the file
+ */
+ public void deleteGlobal(@Nonnull JobID jobId, BlobKey key) throws IOException {
+ checkNotNull(jobId);
+ deleteGlobalInternal(jobId, key);
+ }
+
+ /**
+ * Deletes the file associated with the given key from the BLOB cache and
+ * BLOB server.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
+ * @param key
+ * referring to the file to be deleted
+ *
+ * @throws IOException
+ * thrown if an I/O error occurs while transferring the request to the BLOB server or if the
+ * BLOB server cannot delete the file
+ */
+ private void deleteGlobalInternal(@Nullable JobID jobId, BlobKey key) throws IOException {
// delete locally
- delete(key);
+ deleteInternal(jobId, key);
// then delete on the BLOB server
// (don't use the distributed storage directly - this way the blob
// server is aware of the delete operation, too)
try (BlobClient bc = createClient()) {
- bc.delete(key);
+ bc.deleteInternal(jobId, key);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 0882ec3..9a2f59e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.blob;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
@@ -29,6 +30,8 @@ import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocket;
@@ -46,7 +49,8 @@ import java.util.Collections;
import java.util.List;
import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_FOR_JOB;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_NO_JOB;
import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
@@ -55,7 +59,7 @@ import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
import static org.apache.flink.runtime.blob.BlobUtils.readFully;
import static org.apache.flink.runtime.blob.BlobUtils.readLength;
import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
-import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* The BLOB client can communicate with the BLOB server and either upload (PUT), download (GET),
@@ -75,6 +79,7 @@ public final class BlobClient implements Closeable {
* the network address of the BLOB server
* @param clientConfig
* additional configuration like SSL parameters required to connect to the blob server
+ *
* @throws IOException
* thrown if the connection to the BLOB server could not be established
*/
@@ -130,22 +135,65 @@ public final class BlobClient implements Closeable {
// --------------------------------------------------------------------------------------------
/**
- * Downloads the BLOB identified by the given BLOB key from the BLOB server. If no such BLOB exists on the server, a
- * {@link FileNotFoundException} is thrown.
- *
+ * Downloads the (job-unrelated) BLOB identified by the given BLOB key from the BLOB server.
+ *
* @param blobKey
- * the BLOB key identifying the BLOB to download
+ * blob key associated with the requested file
+ *
* @return an input stream to read the retrieved data from
+ *
+ * @throws FileNotFoundException
+ * if there is no such file;
* @throws IOException
- * thrown if an I/O error occurs during the download
+ * if an I/O error occurs during the download
*/
public InputStream get(BlobKey blobKey) throws IOException {
+ return getInternal(null, blobKey);
+ }
+
+ /**
+ * Downloads the BLOB identified by the given BLOB key from the BLOB server.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to
+ * @param blobKey
+ * blob key associated with the requested file
+ *
+ * @return an input stream to read the retrieved data from
+ *
+ * @throws FileNotFoundException
+ * if there is no such file;
+ * @throws IOException
+ * if an I/O error occurs during the download
+ */
+ public InputStream get(@Nonnull JobID jobId, BlobKey blobKey) throws IOException {
+ checkNotNull(jobId);
+ return getInternal(jobId, blobKey);
+ }
+
+ /**
+ * Downloads the BLOB identified by the given BLOB key from the BLOB server.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
+ * @param blobKey
+ * blob key associated with the requested file
+ *
+ * @return an input stream to read the retrieved data from
+ *
+ * @throws FileNotFoundException
+ * if there is no such file;
+ * @throws IOException
+ * if an I/O error occurs during the download
+ */
+ InputStream getInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOException {
if (this.socket.isClosed()) {
throw new IllegalStateException("BLOB Client is not connected. " +
"Client has been shut down or encountered an error before.");
}
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("GET content addressable BLOB %s from %s", blobKey, socket.getLocalSocketAddress()));
+ LOG.debug("GET BLOB {}/{} from {}.", jobId, blobKey,
+ socket.getLocalSocketAddress());
}
try {
@@ -153,8 +201,8 @@ public final class BlobClient implements Closeable {
InputStream is = this.socket.getInputStream();
// Send GET header
- sendGetHeader(os, null, blobKey);
- receiveAndCheckResponse(is);
+ sendGetHeader(os, jobId, blobKey);
+ receiveAndCheckGetResponse(is);
return new BlobInputStream(is, blobKey);
}
@@ -169,29 +217,40 @@ public final class BlobClient implements Closeable {
*
* @param outputStream
* the output stream to write the header data to
- * @param jobID
- * the job ID identifying the BLOB to download or <code>null</code> to indicate the BLOB key should be used
- * to identify the BLOB on the server instead
+ * @param jobId
+ * ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
* @param blobKey
- * the BLOB key to identify the BLOB to download if either the job ID or the regular key are
- * <code>null</code>
+ * blob key associated with the requested file
+ *
* @throws IOException
* thrown if an I/O error occurs while writing the header data to the output stream
*/
- private void sendGetHeader(OutputStream outputStream, JobID jobID, BlobKey blobKey) throws IOException {
- checkArgument(jobID == null);
+ private static void sendGetHeader(OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey) throws IOException {
+ checkNotNull(blobKey);
// Signal type of operation
outputStream.write(GET_OPERATION);
- // Check if GET should be done in content-addressable manner
- if (jobID == null) {
- outputStream.write(CONTENT_ADDRESSABLE);
- blobKey.writeToOutputStream(outputStream);
+ // Send job ID and key
+ if (jobId == null) {
+ outputStream.write(CONTENT_NO_JOB);
+ } else {
+ outputStream.write(CONTENT_FOR_JOB);
+ outputStream.write(jobId.getBytes());
}
+ blobKey.writeToOutputStream(outputStream);
}
- private void receiveAndCheckResponse(InputStream is) throws IOException {
+ /**
+ * Reads the response from the input stream and throws in case of errors
+ *
+ * @param is
+ * stream to read from
+ *
+ * @throws IOException
+ * if the response is an error or reading the response failed
+ */
+ private static void receiveAndCheckGetResponse(InputStream is) throws IOException {
int response = is.read();
if (response < 0) {
throw new EOFException("Premature end of response");
@@ -211,82 +270,111 @@ public final class BlobClient implements Closeable {
// --------------------------------------------------------------------------------------------
/**
- * Uploads the data of the given byte array to the BLOB server in a content-addressable manner.
+ * Uploads the data of the given byte array for the given job to the BLOB server.
*
+ * @param jobId
+ * the ID of the job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
* @param value
- * the buffer to upload
+ * the buffer to upload
+ *
* @return the computed BLOB key identifying the BLOB on the server
+ *
* @throws IOException
- * thrown if an I/O error occurs while uploading the data to the BLOB server
+ * thrown if an I/O error occurs while uploading the data to the BLOB server
*/
- public BlobKey put(byte[] value) throws IOException {
- return put(value, 0, value.length);
+ @VisibleForTesting
+ public BlobKey put(@Nullable JobID jobId, byte[] value) throws IOException {
+ return put(jobId, value, 0, value.length);
}
/**
- * Uploads data from the given byte array to the BLOB server in a content-addressable manner.
+ * Uploads data from the given byte array for the given job to the BLOB server.
*
+ * @param jobId
+ * the ID of the job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
* @param value
- * the buffer to upload data from
+ * the buffer to upload data from
* @param offset
- * the read offset within the buffer
+ * the read offset within the buffer
* @param len
- * the number of bytes to upload from the buffer
+ * the number of bytes to upload from the buffer
+ *
* @return the computed BLOB key identifying the BLOB on the server
+ *
* @throws IOException
- * thrown if an I/O error occurs while uploading the data to the BLOB server
+ * thrown if an I/O error occurs while uploading the data to the BLOB server
*/
- public BlobKey put(byte[] value, int offset, int len) throws IOException {
- return putBuffer(null, value, offset, len);
+ @VisibleForTesting
+ public BlobKey put(@Nullable JobID jobId, byte[] value, int offset, int len) throws IOException {
+ return putBuffer(jobId, value, offset, len);
}
/**
- * Uploads the data from the given input stream to the BLOB server in a content-addressable manner.
+ * Uploads the (job-unrelated) data from the given input stream to the BLOB server.
*
* @param inputStream
- * the input stream to read the data from
+ * the input stream to read the data from
+ *
* @return the computed BLOB key identifying the BLOB on the server
+ *
* @throws IOException
- * thrown if an I/O error occurs while reading the data from the input stream or uploading the data to the
- * BLOB server
+ * thrown if an I/O error occurs while reading the data from the input stream or uploading the
+ * data to the BLOB server
*/
public BlobKey put(InputStream inputStream) throws IOException {
return putInputStream(null, inputStream);
}
/**
+ * Uploads the data from the given input stream for the given job to the BLOB server.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to
+ * @param inputStream
+ * the input stream to read the data from
+ *
+ * @return the computed BLOB key identifying the BLOB on the server
+ *
+ * @throws IOException
+ * thrown if an I/O error occurs while reading the data from the input stream or uploading the
+ * data to the BLOB server
+ */
+ public BlobKey put(@Nonnull JobID jobId, InputStream inputStream) throws IOException {
+ checkNotNull(jobId);
+ return putInputStream(jobId, inputStream);
+ }
+
+ /**
* Uploads data from the given byte buffer to the BLOB server.
*
* @param jobId
- * the ID of the job the BLOB belongs to or <code>null</code> to store the BLOB in a content-addressable
- * manner
+ * the ID of the job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
* @param value
- * the buffer to read the data from
+ * the buffer to read the data from
* @param offset
- * the read offset within the buffer
+ * the read offset within the buffer
* @param len
- * the number of bytes to read from the buffer
- * @return the computed BLOB key if the BLOB has been stored in a content-addressable manner, <code>null</code>
- * otherwise
+ * the number of bytes to read from the buffer
+ *
+ * @return the computed BLOB key of the uploaded BLOB
+ *
* @throws IOException
- * thrown if an I/O error occurs while uploading the data to the BLOB server
+ * thrown if an I/O error occurs while uploading the data to the BLOB server
*/
- private BlobKey putBuffer(JobID jobId, byte[] value, int offset, int len) throws IOException {
+ private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, int offset, int len) throws IOException {
if (this.socket.isClosed()) {
throw new IllegalStateException("BLOB Client is not connected. " +
"Client has been shut down or encountered an error before.");
}
+ checkNotNull(value);
if (LOG.isDebugEnabled()) {
- if (jobId == null) {
- LOG.debug(String.format("PUT content addressable BLOB buffer (%d bytes) to %s",
- len, socket.getLocalSocketAddress()));
- }
+ LOG.debug("PUT BLOB buffer ({} bytes) to {}.", len, socket.getLocalSocketAddress());
}
try {
final OutputStream os = this.socket.getOutputStream();
- final MessageDigest md = jobId == null ? BlobUtils.createMessageDigest() : null;
+ final MessageDigest md = BlobUtils.createMessageDigest();
// Send the PUT header
sendPutHeader(os, jobId);
@@ -295,15 +383,15 @@ public final class BlobClient implements Closeable {
int remainingBytes = len;
while (remainingBytes > 0) {
+ // want a common code path for byte[] and InputStream at the BlobServer
+ // -> since for InputStream we don't know a total size beforehand, send lengths iteratively
final int bytesToSend = Math.min(BUFFER_SIZE, remainingBytes);
writeLength(bytesToSend, os);
os.write(value, offset, bytesToSend);
- // Update the message digest if necessary
- if (md != null) {
- md.update(value, offset, bytesToSend);
- }
+ // Update the message digest
+ md.update(value, offset, bytesToSend);
remainingBytes -= bytesToSend;
offset += bytesToSend;
@@ -313,7 +401,7 @@ public final class BlobClient implements Closeable {
// Receive blob key and compare
final InputStream is = this.socket.getInputStream();
- return receivePutResponseAndCompare(is, md);
+ return receiveAndCheckPutResponse(is, md);
}
catch (Throwable t) {
BlobUtils.closeSilently(socket, LOG);
@@ -325,37 +413,36 @@ public final class BlobClient implements Closeable {
* Uploads data from the given input stream to the BLOB server.
*
* @param jobId
- * the ID of the job the BLOB belongs to or <code>null</code> to store the BLOB in a content-addressable
- * manner
+ * the ID of the job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
* @param inputStream
- * the input stream to read the data from
- * @return he computed BLOB key if the BLOB has been stored in a content-addressable manner, <code>null</code>
- * otherwise
+ * the input stream to read the data from
+ *
+ * @return the computed BLOB key of the uploaded BLOB
+ *
* @throws IOException
- * thrown if an I/O error occurs while uploading the data to the BLOB server
+ * thrown if an I/O error occurs while uploading the data to the BLOB server
*/
- private BlobKey putInputStream(JobID jobId, InputStream inputStream) throws IOException {
+ private BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream) throws IOException {
if (this.socket.isClosed()) {
throw new IllegalStateException("BLOB Client is not connected. " +
"Client has been shut down or encountered an error before.");
}
+ checkNotNull(inputStream);
if (LOG.isDebugEnabled()) {
- if (jobId == null) {
- LOG.debug(String.format("PUT content addressable BLOB stream to %s",
- socket.getLocalSocketAddress()));
- }
+ LOG.debug("PUT BLOB stream to {}.", socket.getLocalSocketAddress());
}
try {
final OutputStream os = this.socket.getOutputStream();
- final MessageDigest md = jobId == null ? BlobUtils.createMessageDigest() : null;
+ final MessageDigest md = BlobUtils.createMessageDigest();
final byte[] xferBuf = new byte[BUFFER_SIZE];
// Send the PUT header
sendPutHeader(os, jobId);
while (true) {
+ // since we don't know a total size here, send lengths iteratively
final int read = inputStream.read(xferBuf);
if (read < 0) {
// we are done. send a -1 and be done
@@ -365,15 +452,13 @@ public final class BlobClient implements Closeable {
if (read > 0) {
writeLength(read, os);
os.write(xferBuf, 0, read);
- if (md != null) {
- md.update(xferBuf, 0, read);
- }
+ md.update(xferBuf, 0, read);
}
}
// Receive blob key and compare
final InputStream is = this.socket.getInputStream();
- return receivePutResponseAndCompare(is, md);
+ return receiveAndCheckPutResponse(is, md);
}
catch (Throwable t) {
BlobUtils.closeSilently(socket, LOG);
@@ -381,16 +466,25 @@ public final class BlobClient implements Closeable {
}
}
- private BlobKey receivePutResponseAndCompare(InputStream is, MessageDigest md) throws IOException {
+ /**
+ * Reads the response from the input stream and throws in case of errors
+ *
+ * @param is
+ * stream to read from
+ * @param md
+ * message digest to check the response against
+ *
+ * @throws IOException
+ * if the response is an error, the message digest does not match or reading the response
+ * failed
+ */
+ private static BlobKey receiveAndCheckPutResponse(InputStream is, MessageDigest md)
+ throws IOException {
int response = is.read();
if (response < 0) {
throw new EOFException("Premature end of response");
}
else if (response == RETURN_OKAY) {
- if (md == null) {
- // not content addressable
- return null;
- }
BlobKey remoteKey = BlobKey.readFromInputStream(is);
BlobKey localKey = new BlobKey(md.digest());
@@ -412,24 +506,24 @@ public final class BlobClient implements Closeable {
/**
* Constructs and writes the header data for a PUT request to the given output stream.
- * NOTE: If the jobId and key are null, we send the data to the content addressable section.
*
* @param outputStream
- * the output stream to write the PUT header data to
- * @param jobID
- * the ID of job the BLOB belongs to or <code>null</code> to indicate the upload of a
- * content-addressable BLOB
+ * the output stream to write the PUT header data to
+ * @param jobId
+ * the ID of job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
+ *
* @throws IOException
- * thrown if an I/O error occurs while writing the header data to the output stream
+ * thrown if an I/O error occurs while writing the header data to the output stream
*/
- private void sendPutHeader(OutputStream outputStream, JobID jobID) throws IOException {
- checkArgument(jobID == null);
-
+ private static void sendPutHeader(OutputStream outputStream, @Nullable JobID jobId) throws IOException {
// Signal type of operation
outputStream.write(PUT_OPERATION);
-
- // Check if PUT should be done in content-addressable manner
- outputStream.write(CONTENT_ADDRESSABLE);
+ if (jobId == null) {
+ outputStream.write(CONTENT_NO_JOB);
+ } else {
+ outputStream.write(CONTENT_FOR_JOB);
+ outputStream.write(jobId.getBytes());
+ }
}
// --------------------------------------------------------------------------------------------
@@ -437,16 +531,50 @@ public final class BlobClient implements Closeable {
// --------------------------------------------------------------------------------------------
/**
- * Deletes the BLOB identified by the given BLOB key from the BLOB server.
+ * Deletes the (job-unrelated) BLOB identified by the given BLOB key from the BLOB server.
+ *
+ * @param key
+ * the key to identify the BLOB
*
- * @param blobKey
- * the key to identify the BLOB
* @throws IOException
- * thrown if an I/O error occurs while transferring the request to
- * the BLOB server or if the BLOB server cannot delete the file
+ * thrown if an I/O error occurs while transferring the request to the BLOB server or if the
+ * BLOB server cannot delete the file
*/
- public void delete(BlobKey blobKey) throws IOException {
- checkArgument(blobKey != null, "BLOB key must not be null.");
+ public void delete(BlobKey key) throws IOException {
+ deleteInternal(null, key);
+ }
+
+ /**
+ * Deletes the BLOB identified by the given BLOB key and job ID from the BLOB server.
+ *
+ * @param jobId
+ * the ID of job the BLOB belongs to
+ * @param key
+ * the key to identify the BLOB
+ *
+ * @throws IOException
+ * thrown if an I/O error occurs while transferring the request to the BLOB server or if the
+ * BLOB server cannot delete the file
+ */
+ public void delete(@Nonnull JobID jobId, BlobKey key) throws IOException {
+ checkNotNull(jobId);
+ deleteInternal(jobId, key);
+ }
+
+ /**
+ * Deletes the BLOB identified by the given BLOB key and job ID from the BLOB server.
+ *
+ * @param jobId
+ * the ID of job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
+ * @param key
+ * the key to identify the BLOB
+ *
+ * @throws IOException
+ * thrown if an I/O error occurs while transferring the request to the BLOB server or if the
+ * BLOB server cannot delete the file
+ */
+ public void deleteInternal(@Nullable JobID jobId, BlobKey key) throws IOException {
+ checkNotNull(key);
try {
final OutputStream outputStream = this.socket.getOutputStream();
@@ -456,20 +584,16 @@ public final class BlobClient implements Closeable {
outputStream.write(DELETE_OPERATION);
// delete blob key
- outputStream.write(CONTENT_ADDRESSABLE);
- blobKey.writeToOutputStream(outputStream);
-
- int response = inputStream.read();
- if (response < 0) {
- throw new EOFException("Premature end of response");
- }
- if (response == RETURN_ERROR) {
- Throwable cause = readExceptionFromStream(inputStream);
- throw new IOException("Server side error: " + cause.getMessage(), cause);
- }
- else if (response != RETURN_OKAY) {
- throw new IOException("Unrecognized response");
+ if (jobId == null) {
+ outputStream.write(CONTENT_NO_JOB);
+ } else {
+ outputStream.write(CONTENT_FOR_JOB);
+ outputStream.write(jobId.getBytes());
}
+ key.writeToOutputStream(outputStream);
+
+ // the response is the same as for a GET request
+ receiveAndCheckGetResponse(inputStream);
}
catch (Throwable t) {
BlobUtils.closeSilently(socket, LOG);
@@ -479,11 +603,18 @@ public final class BlobClient implements Closeable {
/**
* Uploads the JAR files to a {@link BlobServer} at the given address.
+ * <p>
+ * TODO: add jobId to signature after adapting the BlobLibraryCacheManager
+ *
+ * @param serverAddress
+ * Server address of the {@link BlobServer}
+ * @param clientConfig
+ * Any additional configuration for the blob client
+ * @param jars
+ * List of JAR files to upload
*
- * @param serverAddress Server address of the {@link BlobServer}
- * @param clientConfig Any additional configuration for the blob client
- * @param jars List of JAR files to upload
- * @throws IOException Thrown if the upload fails
+ * @throws IOException
+ * if the upload fails
*/
public static List<BlobKey> uploadJarFiles(
InetSocketAddress serverAddress,
@@ -500,7 +631,7 @@ public final class BlobClient implements Closeable {
FSDataInputStream is = null;
try {
is = fs.open(jar);
- final BlobKey key = blobClient.put(is);
+ final BlobKey key = blobClient.putInputStream(null, is);
blobKeys.add(key);
} finally {
if (is != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index add9f7f..43a060a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.blob;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -28,6 +29,8 @@ import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import java.io.File;
import java.io.FileNotFoundException;
@@ -40,7 +43,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -58,7 +61,7 @@ public class BlobServer extends Thread implements BlobService {
private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class);
/** Counter to generate unique names for temporary files. */
- private final AtomicInteger tempFileCounter = new AtomicInteger(0);
+ private final AtomicLong tempFileCounter = new AtomicLong(0);
/** The server socket listening for incoming connections. */
private final ServerSocket serverSocket;
@@ -110,7 +113,7 @@ public class BlobServer extends Thread implements BlobService {
// configure and create the storage directory
String storageDirectory = config.getString(BlobServerOptions.STORAGE_DIRECTORY);
- this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
+ this.storageDir = BlobUtils.initLocalStorageDirectory(storageDirectory);
LOG.info("Created BLOB server storage directory {}", storageDir);
// configure the maximum number of concurrent connections
@@ -189,11 +192,12 @@ public class BlobServer extends Thread implements BlobService {
*
* <p><strong>This is only called from the {@link BlobServerConnection}</strong>
*
+ * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
* @param key identifying the file
* @return file handle to the file
*/
- File getStorageLocation(BlobKey key) {
- return BlobUtils.getStorageLocation(storageDir, key);
+ File getStorageLocation(JobID jobId, BlobKey key) {
+ return BlobUtils.getStorageLocation(storageDir, jobId, key);
}
/**
@@ -333,20 +337,69 @@ public class BlobServer extends Thread implements BlobService {
}
/**
- * Method which retrieves the local path of a file associated with a blob key. The blob server
- * looks the blob key up in its local storage. If the file exists, it is returned. If the
- * file does not exist, it is retrieved from the HA blob store (if available) or a
- * FileNotFoundException is thrown.
+ * Retrieves the local path of a (job-unrelated) file associated with a job and a blob key.
+ * <p>
+ * The blob server looks the blob key up in its local storage. If the file exists, it is
+ * returned. If the file does not exist, it is retrieved from the HA blob store (if available)
+ * or a {@link FileNotFoundException} is thrown.
*
- * @param requiredBlob blob key associated with the requested file
- * @return file referring to the local storage location of the BLOB.
- * @throws IOException Thrown if the file retrieval failed.
+ * @param key
+ * blob key associated with the requested file
+ *
+ * @return file referring to the local storage location of the BLOB
+ *
+ * @throws IOException
+ * Thrown if the file retrieval failed.
+ */
+ @Override
+ public File getFile(BlobKey key) throws IOException {
+ return getFileInternal(null, key);
+ }
+
+ /**
+ * Retrieves the local path of a file associated with a job and a blob key.
+ * <p>
+ * The blob server looks the blob key up in its local storage. If the file exists, it is
+ * returned. If the file does not exist, it is retrieved from the HA blob store (if available)
+ * or a {@link FileNotFoundException} is thrown.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to
+ * @param key
+ * blob key associated with the requested file
+ *
+ * @return file referring to the local storage location of the BLOB
+ *
+ * @throws IOException
+ * Thrown if the file retrieval failed.
*/
@Override
- public File getFile(BlobKey requiredBlob) throws IOException {
+ public File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException {
+ checkNotNull(jobId);
+ return getFileInternal(jobId, key);
+ }
+
+ /**
+ * Retrieves the local path of a file associated with a job and a blob key.
+ * <p>
+ * The blob server looks the blob key up in its local storage. If the file exists, it is
+ * returned. If the file does not exist, it is retrieved from the HA blob store (if available)
+ * or a {@link FileNotFoundException} is thrown.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
+ * @param requiredBlob
+ * blob key associated with the requested file
+ *
+ * @return file referring to the local storage location of the BLOB
+ *
+ * @throws IOException
+ * Thrown if the file retrieval failed.
+ */
+ private File getFileInternal(@Nullable JobID jobId, BlobKey requiredBlob) throws IOException {
checkArgument(requiredBlob != null, "BLOB key cannot be null.");
- final File localFile = BlobUtils.getStorageLocation(storageDir, requiredBlob);
+ final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, requiredBlob);
if (localFile.exists()) {
return localFile;
@@ -354,10 +407,11 @@ public class BlobServer extends Thread implements BlobService {
else {
try {
// Try the blob store
- blobStore.get(requiredBlob, localFile);
+ blobStore.get(jobId, requiredBlob, localFile);
}
catch (Exception e) {
- throw new IOException("Failed to copy from blob store.", e);
+ throw new IOException(
+ "Failed to copy BLOB " + requiredBlob + " from blob store to " + localFile, e);
}
if (localFile.exists()) {
@@ -371,24 +425,58 @@ public class BlobServer extends Thread implements BlobService {
}
/**
- * This method deletes the file associated to the blob key if it exists in the local storage
- * of the blob server.
+ * Deletes the (job-unrelated) file associated with the blob key in both the local storage as
+ * well as in the HA store of the blob server.
+ *
+ * @param key
+ * blob key associated with the file to be deleted
*
- * @param key associated with the file to be deleted
* @throws IOException
*/
@Override
public void delete(BlobKey key) throws IOException {
- final File localFile = BlobUtils.getStorageLocation(storageDir, key);
+ deleteInternal(null, key);
+ }
+
+ /**
+ * Deletes the file associated with the blob key in both the local storage as well as in the HA
+ * store of the blob server.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to
+ * @param key
+ * blob key associated with the file to be deleted
+ *
+ * @throws IOException
+ */
+ @Override
+ public void delete(@Nonnull JobID jobId, BlobKey key) throws IOException {
+ checkNotNull(jobId);
+ deleteInternal(jobId, key);
+ }
+
+ /**
+ * Deletes the file associated with the blob key in both the local storage as well as in the HA
+ * store of the blob server.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
+ * @param key
+ * blob key associated with the file to be deleted
+ *
+ * @throws IOException
+ */
+ void deleteInternal(@Nullable JobID jobId, BlobKey key) throws IOException {
+ final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, key);
readWriteLock.writeLock().lock();
try {
if (!localFile.delete() && localFile.exists()) {
- LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath());
+ LOG.warn("Failed to locally delete BLOB " + key + " at " + localFile.getAbsolutePath());
}
- blobStore.delete(key);
+ blobStore.delete(jobId, key);
} finally {
readWriteLock.writeLock().unlock();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index 181211d..f1054c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -39,7 +39,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_FOR_JOB;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_NO_JOB;
import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
@@ -49,6 +50,7 @@ import static org.apache.flink.runtime.blob.BlobUtils.closeSilently;
import static org.apache.flink.runtime.blob.BlobUtils.readFully;
import static org.apache.flink.runtime.blob.BlobUtils.readLength;
import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A BLOB connection handles a series of requests from a particular BLOB client.
@@ -83,12 +85,8 @@ class BlobServerConnection extends Thread {
super("BLOB connection for " + clientSocket.getRemoteSocketAddress());
setDaemon(true);
- if (blobServer == null) {
- throw new NullPointerException();
- }
-
this.clientSocket = clientSocket;
- this.blobServer = blobServer;
+ this.blobServer = checkNotNull(blobServer);
this.blobStore = blobServer.getBlobStore();
ReadWriteLock readWriteLock = blobServer.getReadWriteLock();
@@ -167,15 +165,16 @@ class BlobServerConnection extends Thread {
/**
* Handles an incoming GET request from a BLOB client.
- *
+ *
* @param inputStream
- * the input stream to read incoming data from
+ * the input stream to read incoming data from
* @param outputStream
- * the output stream to send data back to the client
+ * the output stream to send data back to the client
* @param buf
- * an auxiliary buffer for data serialization/deserialization
+ * an auxiliary buffer for data serialization/deserialization
+ *
* @throws IOException
- * thrown if an I/O error occurs while reading/writing data from/to the respective streams
+ * thrown if an I/O error occurs while reading/writing data from/to the respective streams
*/
private void get(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
/*
@@ -187,25 +186,36 @@ class BlobServerConnection extends Thread {
* so a local cache makes more sense.
*/
- File blobFile;
- int contentAddressable = -1;
- JobID jobId = null;
- BlobKey blobKey = null;
+ final File blobFile;
+ final JobID jobId;
+ final BlobKey blobKey;
try {
- contentAddressable = inputStream.read();
+ final int mode = inputStream.read();
- if (contentAddressable < 0) {
+ if (mode < 0) {
throw new EOFException("Premature end of GET request");
}
- if (contentAddressable == CONTENT_ADDRESSABLE) {
- blobKey = BlobKey.readFromInputStream(inputStream);
- blobFile = blobServer.getStorageLocation(blobKey);
+
+ // Receive the job ID and key
+ if (mode == CONTENT_NO_JOB) {
+ jobId = null;
+ } else if (mode == CONTENT_FOR_JOB) {
+ byte[] jidBytes = new byte[JobID.SIZE];
+ readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
+ jobId = JobID.fromByteArray(jidBytes);
+ } else {
+ throw new IOException("Unknown type of BLOB addressing: " + mode + '.');
}
- else {
- throw new IOException("Unknown type of BLOB addressing: " + contentAddressable + '.');
+ blobKey = BlobKey.readFromInputStream(inputStream);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received GET request for BLOB {}/{} from {}.", jobId,
+ blobKey, clientSocket.getInetAddress());
}
+ blobFile = blobServer.getStorageLocation(jobId, blobKey);
+
// up to here, an error can give a good message
}
catch (Throwable t) {
@@ -214,7 +224,7 @@ class BlobServerConnection extends Thread {
writeErrorToStream(outputStream, t);
}
catch (IOException e) {
- // since we are in an exception case, it means not much that we could not send the error
+ // since we are in an exception case, it means that we could not send the error
// ignore this
}
clientSocket.close();
@@ -224,6 +234,7 @@ class BlobServerConnection extends Thread {
readLock.lock();
try {
+ // copy the file to local store if it does not exist yet
try {
if (!blobFile.exists()) {
// first we have to release the read lock in order to acquire the write lock
@@ -232,9 +243,9 @@ class BlobServerConnection extends Thread {
try {
if (blobFile.exists()) {
- LOG.debug("Blob file {} has downloaded from the BlobStore by a different connection.", blobFile);
+ LOG.debug("Blob file {} has been downloaded from the (distributed) blob store by a different connection.", blobFile);
} else {
- blobStore.get(blobKey, blobFile);
+ blobStore.get(jobId, blobKey, blobFile);
}
} finally {
writeLock.unlock();
@@ -248,6 +259,7 @@ class BlobServerConnection extends Thread {
}
}
+ // enforce a 2GB max for now (otherwise the protocol's length field needs to be increased)
if (blobFile.length() > Integer.MAX_VALUE) {
throw new IOException("BLOB size exceeds the maximum size (2 GB).");
}
@@ -259,7 +271,7 @@ class BlobServerConnection extends Thread {
writeErrorToStream(outputStream, t);
}
catch (IOException e) {
- // since we are in an exception case, it means not much that we could not send the error
+ // since we are in an exception case, it means that we could not send the error
// ignore this
}
clientSocket.close();
@@ -294,59 +306,48 @@ class BlobServerConnection extends Thread {
/**
* Handles an incoming PUT request from a BLOB client.
- *
- * @param inputStream The input stream to read incoming data from.
- * @param outputStream The output stream to send data back to the client.
- * @param buf An auxiliary buffer for data serialization/deserialization.
+ *
+ * @param inputStream
+ * The input stream to read incoming data from
+ * @param outputStream
+ * The output stream to send data back to the client
+ * @param buf
+ * An auxiliary buffer for data serialization/deserialization
+ *
+ * @throws IOException
+ * thrown if an I/O error occurs while reading/writing data from/to the respective streams
*/
private void put(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
- JobID jobID = null;
- MessageDigest md = null;
-
File incomingFile = null;
- FileOutputStream fos = null;
try {
- final int contentAddressable = inputStream.read();
- if (contentAddressable < 0) {
+ final int mode = inputStream.read();
+
+ if (mode < 0) {
throw new EOFException("Premature end of PUT request");
}
- if (contentAddressable == CONTENT_ADDRESSABLE) {
- md = BlobUtils.createMessageDigest();
- }
- else {
+ // Receive the job ID and key
+ final JobID jobId;
+ if (mode == CONTENT_NO_JOB) {
+ jobId = null;
+ } else if (mode == CONTENT_FOR_JOB) {
+ byte[] jidBytes = new byte[JobID.SIZE];
+ readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
+ jobId = JobID.fromByteArray(jidBytes);
+ } else {
throw new IOException("Unknown type of BLOB addressing.");
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Received PUT request for content addressable BLOB");
+ LOG.debug("Received PUT request for BLOB of job {} with from {}.", jobId,
+ clientSocket.getInetAddress());
}
incomingFile = blobServer.createTemporaryFilename();
- fos = new FileOutputStream(incomingFile);
+ BlobKey blobKey = readFileFully(inputStream, incomingFile, buf);
- while (true) {
- final int bytesExpected = readLength(inputStream);
- if (bytesExpected == -1) {
- // done
- break;
- }
- if (bytesExpected > BUFFER_SIZE) {
- throw new IOException("Unexpected number of incoming bytes: " + bytesExpected);
- }
-
- readFully(inputStream, buf, 0, bytesExpected, "buffer");
- fos.write(buf, 0, bytesExpected);
-
- if (md != null) {
- md.update(buf, 0, bytesExpected);
- }
- }
- fos.close();
-
- BlobKey blobKey = new BlobKey(md.digest());
- File storageFile = blobServer.getStorageLocation(blobKey);
+ File storageFile = blobServer.getStorageLocation(jobId, blobKey);
writeLock.lock();
@@ -369,13 +370,15 @@ class BlobServerConnection extends Thread {
// only the one moving the incoming file to its final destination is allowed to upload the
// file to the blob store
- blobStore.put(storageFile, blobKey);
+ blobStore.put(storageFile, jobId, blobKey);
+ } else {
+ LOG.warn("File upload for an existing file with key {} for job {}. This may indicate a duplicate upload or a hash collision. Ignoring newest upload.", blobKey, jobId);
}
} catch(IOException ioe) {
// we failed to either create the local storage file or to upload it --> try to delete the local file
// while still having the write lock
- if (storageFile.exists() && !storageFile.delete()) {
- LOG.warn("Could not delete the storage file.");
+ if (!storageFile.delete() && storageFile.exists()) {
+ LOG.warn("Could not delete the storage file with key {} and job {}.", blobKey, jobId);
}
throw ioe;
@@ -403,15 +406,8 @@ class BlobServerConnection extends Thread {
clientSocket.close();
}
finally {
- if (fos != null) {
- try {
- fos.close();
- } catch (Throwable t) {
- LOG.warn("Cannot close stream to BLOB staging file", t);
- }
- }
if (incomingFile != null) {
- if (!incomingFile.delete()) {
+ if (!incomingFile.delete() && incomingFile.exists()) {
LOG.warn("Cannot delete BLOB server staging file " + incomingFile.getAbsolutePath());
}
}
@@ -419,27 +415,87 @@ class BlobServerConnection extends Thread {
}
/**
+ * Reads a full file from <tt>inputStream</tt> into <tt>incomingFile</tt> returning its checksum.
+ *
+ * @param inputStream
+ * stream to read from
+ * @param incomingFile
+ * file to write to
+ * @param buf
+ * An auxiliary buffer for data serialization/deserialization
+ *
+ * @return the received file's content hash as a BLOB key
+ *
+ * @throws IOException
+ * thrown if an I/O error occurs while reading/writing data from/to the respective streams
+ */
+ private static BlobKey readFileFully(
+ final InputStream inputStream, final File incomingFile, final byte[] buf)
+ throws IOException {
+ MessageDigest md = BlobUtils.createMessageDigest();
+ FileOutputStream fos = new FileOutputStream(incomingFile);
+
+ try {
+ while (true) {
+ final int bytesExpected = readLength(inputStream);
+ if (bytesExpected == -1) {
+ // done
+ break;
+ }
+ if (bytesExpected > BUFFER_SIZE) {
+ throw new IOException(
+ "Unexpected number of incoming bytes: " + bytesExpected);
+ }
+
+ readFully(inputStream, buf, 0, bytesExpected, "buffer");
+ fos.write(buf, 0, bytesExpected);
+
+ md.update(buf, 0, bytesExpected);
+ }
+ return new BlobKey(md.digest());
+ } finally {
+ try {
+ fos.close();
+ } catch (Throwable t) {
+ LOG.warn("Cannot close stream to BLOB staging file", t);
+ }
+ }
+ }
+
+ /**
* Handles an incoming DELETE request from a BLOB client.
- *
- * @param inputStream The input stream to read the request from.
- * @param outputStream The output stream to write the response to.
- * @throws java.io.IOException Thrown if an I/O error occurs while reading the request data from the input stream.
+ *
+ * @param inputStream
+ * The input stream to read the request from.
+ * @param outputStream
+ * The output stream to write the response to.
+ *
+ * @throws IOException
+ * Thrown if an I/O error occurs while reading the request data from the input stream.
*/
private void delete(InputStream inputStream, OutputStream outputStream) throws IOException {
try {
- int type = inputStream.read();
- if (type < 0) {
+ final int mode = inputStream.read();
+
+ if (mode < 0) {
throw new EOFException("Premature end of DELETE request");
}
- if (type == CONTENT_ADDRESSABLE) {
- BlobKey key = BlobKey.readFromInputStream(inputStream);
- blobServer.delete(key);
- }
- else {
- throw new IOException("Unrecognized addressing type: " + type);
+ // Receive the job ID and key
+ final JobID jobId;
+ if (mode == CONTENT_NO_JOB) {
+ jobId = null;
+ } else if (mode == CONTENT_FOR_JOB) {
+ byte[] jidBytes = new byte[JobID.SIZE];
+ readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
+ jobId = JobID.fromByteArray(jidBytes);
+ } else {
+ throw new IOException("Unknown type of BLOB addressing.");
}
+ BlobKey key = BlobKey.readFromInputStream(inputStream);
+
+ blobServer.deleteInternal(jobId, key);
outputStream.write(RETURN_OKAY);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
index d8ac833..681fc81 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
@@ -42,12 +42,20 @@ public class BlobServerProtocol {
static final byte RETURN_ERROR = 1;
/**
- * Internal code to identify a reference via content hash as the key.
+ * Internal code to identify a job-unrelated reference via content hash as the key.
* <p>
* Note: previously, there was also <tt>NAME_ADDRESSABLE</tt> (code <tt>1</tt>) and
* <tt>JOB_ID_SCOPE</tt> (code <tt>2</tt>).
*/
- static final byte CONTENT_ADDRESSABLE = 0;
+ static final byte CONTENT_NO_JOB = 0;
+
+ /**
+ * Internal code to identify a job-related reference via content hash as the key.
+ * <p>
+ * Note: previously, there was also <tt>NAME_ADDRESSABLE</tt> (code <tt>1</tt>) and
+ * <tt>JOB_ID_SCOPE</tt> (code <tt>2</tt>).
+ */
+ static final byte CONTENT_FOR_JOB = 3;
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
index 1e56f26..a78c88c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
@@ -18,6 +18,9 @@
package org.apache.flink.runtime.blob;
+import org.apache.flink.api.common.JobID;
+
+import javax.annotation.Nonnull;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
@@ -28,7 +31,8 @@ import java.io.IOException;
public interface BlobService extends Closeable {
/**
- * Returns the path to a local copy of the file associated with the provided blob key.
+ * Returns the path to a local copy of the (job-unrelated) file associated with the provided
+ * blob key.
*
* @param key blob key associated with the requested file
* @return The path to the file.
@@ -37,9 +41,19 @@ public interface BlobService extends Closeable {
*/
File getFile(BlobKey key) throws IOException;
+ /**
+ * Returns the path to a local copy of the file associated with the provided job ID and blob key.
+ *
+ * @param jobId ID of the job this blob belongs to
+ * @param key blob key associated with the requested file
+ * @return The path to the file.
+ * @throws java.io.FileNotFoundException when the path does not exist;
+ * @throws IOException if any other error occurs when retrieving the file
+ */
+ File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException;
/**
- * Deletes the file associated with the provided blob key.
+ * Deletes the (job-unrelated) file associated with the provided blob key.
*
* @param key associated with the file to be deleted
* @throws IOException
@@ -47,10 +61,19 @@ public interface BlobService extends Closeable {
void delete(BlobKey key) throws IOException;
/**
+ * Deletes the file associated with the provided job ID and blob key.
+ *
+ * @param jobId ID of the job this blob belongs to
+ * @param key associated with the file to be deleted
+ * @throws IOException
+ */
+ void delete(@Nonnull JobID jobId, BlobKey key) throws IOException;
+
+ /**
* Returns the port of the blob service.
* @return the port of the blob service.
*/
int getPort();
-
+
BlobClient createClient() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
index 1e8b73a..d2ea8ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
@@ -32,19 +32,21 @@ public interface BlobStore extends BlobView {
* Copies the local file to the blob store.
*
* @param localFile The file to copy
+ * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
* @param blobKey The ID for the file in the blob store
* @throws IOException If the copy fails
*/
- void put(File localFile, BlobKey blobKey) throws IOException;
+ void put(File localFile, JobID jobId, BlobKey blobKey) throws IOException;
/**
* Tries to delete a blob from storage.
*
* <p>NOTE: This also tries to delete any created directories if empty.</p>
*
+ * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
* @param blobKey The blob ID
*/
- void delete(BlobKey blobKey);
+ void delete(JobID jobId, BlobKey blobKey);
/**
* Tries to delete all blobs for the given job from storage.
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index e8f3fe5..9a13412 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
@@ -62,6 +64,11 @@ public class BlobUtils {
private static final String JOB_DIR_PREFIX = "job_";
/**
+ * The prefix of all job-unrelated directories created by the BLOB server.
+ */
+ private static final String NO_JOB_DIR_PREFIX = "no_job";
+
+ /**
* Creates a BlobStore based on the parameters set in the configuration.
*
* @param config
@@ -116,26 +123,29 @@ public class BlobUtils {
}
/**
- * Creates a storage directory for a blob service.
+ * Creates a local storage directory for a blob service under the given parent directory.
*
- * @return the storage directory used by a BLOB service
+ * @param basePath
+ * base path, i.e. parent directory, of the storage directory to use (if <tt>null</tt> or
+ * empty, the path in <tt>java.io.tmpdir</tt> will be used)
+ *
+ * @return a new local storage directory
*
* @throws IOException
- * thrown if the (local or distributed) file storage cannot be created or
- * is not usable
+ * thrown if the local file storage cannot be created or is not usable
*/
- static File initStorageDirectory(String storageDirectory) throws
- IOException {
+ static File initLocalStorageDirectory(String basePath) throws IOException {
File baseDir;
- if (StringUtils.isNullOrWhitespaceOnly(storageDirectory)) {
+ if (StringUtils.isNullOrWhitespaceOnly(basePath)) {
baseDir = new File(System.getProperty("java.io.tmpdir"));
}
else {
- baseDir = new File(storageDirectory);
+ baseDir = new File(basePath);
}
File storageDir;
+ // NOTE: although we will be using UUIDs, there may be collisions
final int MAX_ATTEMPTS = 10;
for(int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
storageDir = new File(baseDir, String.format(
@@ -143,7 +153,7 @@ public class BlobUtils {
// Create the storage dir if it doesn't exist. Only return it when the operation was
// successful.
- if (!storageDir.exists() && storageDir.mkdirs()) {
+ if (storageDir.mkdirs()) {
return storageDir;
}
}
@@ -153,46 +163,108 @@ public class BlobUtils {
}
/**
- * Returns the BLOB service's directory for incoming files. The directory is created if it did
- * not exist so far.
+ * Returns the BLOB service's directory for incoming (job-unrelated) files. The directory is
+ * created if it does not exist yet.
+ *
+ * @param storageDir
+ * storage directory used be the BLOB service
*
- * @return the BLOB server's directory for incoming files
+ * @return the BLOB service's directory for incoming files
*/
static File getIncomingDirectory(File storageDir) {
final File incomingDir = new File(storageDir, "incoming");
- if (!incomingDir.mkdirs() && !incomingDir.exists()) {
- throw new RuntimeException("Cannot create directory for incoming files " + incomingDir.getAbsolutePath());
- }
+ mkdirTolerateExisting(incomingDir, "incoming");
return incomingDir;
}
/**
- * Returns the BLOB service's directory for cached files. The directory is created if it did
- * not exist so far.
+ * Makes sure a given directory exists by creating it if necessary.
*
- * @return the BLOB server's directory for cached files
+ * @param dir
+ * directory to create
+ * @param dirType
+ * the type of the directory (included in error message if something fails)
*/
- private static File getCacheDirectory(File storageDir) {
- final File cacheDirectory = new File(storageDir, "cache");
-
- if (!cacheDirectory.mkdirs() && !cacheDirectory.exists()) {
- throw new RuntimeException("Could not create cache directory '" + cacheDirectory.getAbsolutePath() + "'.");
+ private static void mkdirTolerateExisting(final File dir, final String dirType) {
+ // note: thread-safe create should try to mkdir first and then ignore the case that the
+ // directory already existed
+ if (!dir.mkdirs() && !dir.exists()) {
+ throw new RuntimeException(
+ "Cannot create " + dirType + " directory '" + dir.getAbsolutePath() + "'.");
}
-
- return cacheDirectory;
}
/**
* Returns the (designated) physical storage location of the BLOB with the given key.
*
+ * @param storageDir
+ * storage directory used be the BLOB service
* @param key
- * the key identifying the BLOB
+ * the key identifying the BLOB
+ * @param jobId
+ * ID of the job for the incoming files (or <tt>null</tt> if job-unrelated)
+ *
* @return the (designated) physical storage location of the BLOB
*/
- static File getStorageLocation(File storageDir, BlobKey key) {
- return new File(getCacheDirectory(storageDir), BLOB_FILE_PREFIX + key.toString());
+ static File getStorageLocation(
+ @Nonnull File storageDir, @Nullable JobID jobId, @Nonnull BlobKey key) {
+ File file = new File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
+
+ mkdirTolerateExisting(file.getParentFile(), "cache");
+
+ return file;
+ }
+
+ /**
+ * Returns the BLOB server's storage directory for BLOBs belonging to the job with the given ID
+ * <em>without</em> creating the directory.
+ *
+ * @param storageDir
+ * storage directory used be the BLOB service
+ * @param jobId
+ * the ID of the job to return the storage directory for
+ *
+ * @return the storage directory for BLOBs belonging to the job with the given ID
+ */
+ static String getStorageLocationPath(@Nonnull String storageDir, @Nullable JobID jobId) {
+ if (jobId == null) {
+ // format: $base/no_job
+ return String.format("%s/%s", storageDir, NO_JOB_DIR_PREFIX);
+ } else {
+ // format: $base/job_$jobId
+ return String.format("%s/%s%s", storageDir, JOB_DIR_PREFIX, jobId.toString());
+ }
+ }
+
+ /**
+ * Returns the path for the given blob key.
+ * <p>
+ * The returned path can be used with the (local or HA) BLOB store file system back-end for
+ * recovery purposes and follows the same scheme as {@link #getStorageLocation(File, JobID,
+ * BlobKey)}.
+ *
+ * @param storageDir
+ * storage directory used be the BLOB service
+ * @param key
+ * the key identifying the BLOB
+ * @param jobId
+ * ID of the job for the incoming files
+ *
+ * @return the path to the given BLOB
+ */
+ static String getStorageLocationPath(
+ @Nonnull String storageDir, @Nullable JobID jobId, @Nonnull BlobKey key) {
+ if (jobId == null) {
+ // format: $base/no_job/blob_$key
+ return String.format("%s/%s/%s%s",
+ storageDir, NO_JOB_DIR_PREFIX, BLOB_FILE_PREFIX, key.toString());
+ } else {
+ // format: $base/job_$jobId/blob_$key
+ return String.format("%s/%s%s/%s%s",
+ storageDir, JOB_DIR_PREFIX, jobId.toString(), BLOB_FILE_PREFIX, key.toString());
+ }
}
/**
@@ -200,6 +272,7 @@ public class BlobUtils {
*
* @return a new instance of the message digest to use for the BLOB key computation
*/
+ @Nonnull
static MessageDigest createMessageDigest() {
try {
return MessageDigest.getInstance(HASHING_ALGORITHM);
@@ -333,28 +406,6 @@ public class BlobUtils {
}
/**
- * Returns the path for the given blob key.
- *
- * <p>The returned path can be used with the state backend for recovery purposes.
- *
- * <p>This follows the same scheme as {@link #getStorageLocation(File, BlobKey)}
- * and is used for HA.
- */
- static String getRecoveryPath(String basePath, BlobKey blobKey) {
- // format: $base/cache/blob_$key
- return String.format("%s/cache/%s%s", basePath, BLOB_FILE_PREFIX, blobKey.toString());
- }
-
- /**
- * Returns the path for the given job ID.
- *
- * <p>The returned path can be used with the state backend for recovery purposes.
- */
- static String getRecoveryPath(String basePath, JobID jobId) {
- return String.format("%s/%s%s", basePath, JOB_DIR_PREFIX, jobId.toString());
- }
-
- /**
* Private constructor to prevent instantiation.
*/
private BlobUtils() {
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
index 2e2e4a7..8916d95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.blob;
+import org.apache.flink.api.common.JobID;
+
import java.io.File;
import java.io.IOException;
@@ -29,9 +31,10 @@ public interface BlobView {
/**
* Copies a blob to a local file.
*
+ * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
* @param blobKey The blob ID
* @param localFile The local file to copy to
* @throws IOException If the copy fails
*/
- void get(BlobKey blobKey, File localFile) throws IOException;
+ void get(JobID jobId, BlobKey blobKey, File localFile) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 5f8058b..83abecb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -64,8 +64,8 @@ public class FileSystemBlobStore implements BlobStoreService {
// - Put ------------------------------------------------------------------
@Override
- public void put(File localFile, BlobKey blobKey) throws IOException {
- put(localFile, BlobUtils.getRecoveryPath(basePath, blobKey));
+ public void put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
+ put(localFile, BlobUtils.getStorageLocationPath(basePath, jobId, blobKey));
}
private void put(File fromFile, String toBlobPath) throws IOException {
@@ -78,8 +78,8 @@ public class FileSystemBlobStore implements BlobStoreService {
// - Get ------------------------------------------------------------------
@Override
- public void get(BlobKey blobKey, File localFile) throws IOException {
- get(BlobUtils.getRecoveryPath(basePath, blobKey), localFile);
+ public void get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
+ get(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey), localFile);
}
private void get(String fromBlobPath, File toFile) throws IOException {
@@ -112,13 +112,13 @@ public class FileSystemBlobStore implements BlobStoreService {
// - Delete ---------------------------------------------------------------
@Override
- public void delete(BlobKey blobKey) {
- delete(BlobUtils.getRecoveryPath(basePath, blobKey));
+ public void delete(JobID jobId, BlobKey blobKey) {
+ delete(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey));
}
@Override
public void deleteAll(JobID jobId) {
- delete(BlobUtils.getRecoveryPath(basePath, jobId));
+ delete(BlobUtils.getStorageLocationPath(basePath, jobId));
}
private void delete(String blobPath) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
index 6e2bb53..95be569 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -29,16 +29,15 @@ import java.io.IOException;
public class VoidBlobStore implements BlobStoreService {
@Override
- public void put(File localFile, BlobKey blobKey) throws IOException {
+ public void put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
}
-
@Override
- public void get(BlobKey blobKey, File localFile) throws IOException {
+ public void get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
}
@Override
- public void delete(BlobKey blobKey) {
+ public void delete(JobID jobId, BlobKey blobKey) {
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index e3657ff..9cc6210 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -234,6 +234,7 @@ public class JobClient {
int pos = 0;
for (BlobKey blobKey : props.requiredJarFiles()) {
try {
+ // TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
allURLs[pos++] = blobClient.getFile(blobKey).toURI().toURL();
} catch (Exception e) {
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 1c68515..6b92d79 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -535,6 +535,7 @@ public class JobGraph implements Serializable {
InetSocketAddress blobServerAddress,
Configuration blobClientConfig) throws IOException {
if (!userJars.isEmpty()) {
+ // TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
List<BlobKey> blobKeys = BlobClient.uploadJarFiles(blobServerAddress, blobClientConfig, userJars);
for (BlobKey blobKey : blobKeys) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
index fe763fa..8c575a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.blob;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -25,6 +26,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
@@ -41,7 +43,8 @@ public class BlobCacheRetriesTest {
public TemporaryFolder temporaryFolder = new TemporaryFolder();
/**
- * A test where the connection fails twice and then the get operation succeeds.
+ * A test where the connection fails twice and then the get operation succeeds
+ * (job-unrelated blob).
*/
@Test
public void testBlobFetchRetries() throws IOException {
@@ -49,15 +52,41 @@ public class BlobCacheRetriesTest {
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
- testBlobFetchRetries(config, new VoidBlobStore());
+ testBlobFetchRetries(config, new VoidBlobStore(), null);
+ }
+
+ /**
+ * A test where the connection fails twice and then the get operation succeeds
+ * (job-related blob).
+ */
+ @Test
+ public void testBlobForJobFetchRetries() throws IOException {
+ final Configuration config = new Configuration();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+ temporaryFolder.newFolder().getAbsolutePath());
+
+ testBlobFetchRetries(config, new VoidBlobStore(), new JobID());
+ }
+
+ /**
+ * A test where the connection fails twice and then the get operation succeeds
+ * (with high availability set, job-unrelated blob).
+ */
+ @Test
+ public void testBlobNoJobFetchRetriesHa() throws IOException {
+ testBlobFetchRetriesHa(null);
}
/**
* A test where the connection fails twice and then the get operation succeeds
- * (with high availability set).
+ * (with high availability set, job-related job).
*/
@Test
public void testBlobFetchRetriesHa() throws IOException {
+ testBlobFetchRetriesHa(new JobID());
+ }
+
+ private void testBlobFetchRetriesHa(final JobID jobId) throws IOException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
@@ -70,7 +99,7 @@ public class BlobCacheRetriesTest {
try {
blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
- testBlobFetchRetries(config, blobStoreService);
+ testBlobFetchRetries(config, blobStoreService, jobId);
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
@@ -86,7 +115,9 @@ public class BlobCacheRetriesTest {
* configuration to use (the BlobCache will get some additional settings
* set compared to this one)
*/
- private void testBlobFetchRetries(final Configuration config, final BlobStore blobStore) throws IOException {
+ private static void testBlobFetchRetries(
+ final Configuration config, final BlobStore blobStore, final JobID jobId)
+ throws IOException {
final byte[] data = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
BlobServer server = null;
@@ -104,7 +135,7 @@ public class BlobCacheRetriesTest {
try {
blobClient = new BlobClient(serverAddress, config);
- key = blobClient.put(data);
+ key = blobClient.put(jobId, data);
}
finally {
if (blobClient != null) {
@@ -115,16 +146,13 @@ public class BlobCacheRetriesTest {
cache = new BlobCache(serverAddress, config, new VoidBlobStore());
// trigger a download - it should fail the first two times, but retry, and succeed eventually
- URL url = cache.getFile(key).toURI().toURL();
- InputStream is = url.openStream();
- try {
+ File file = jobId == null ? cache.getFile(key) : cache.getFile(jobId, key);
+ URL url = file.toURI().toURL();
+ try (InputStream is = url.openStream()) {
byte[] received = new byte[data.length];
assertEquals(data.length, is.read(received));
assertArrayEquals(data, received);
}
- finally {
- is.close();
- }
} finally {
if (cache != null) {
cache.close();
@@ -136,23 +164,50 @@ public class BlobCacheRetriesTest {
}
/**
- * A test where the connection fails too often and eventually fails the GET request.
+ * A test where the connection fails too often and eventually fails the GET request
+ * (job-unrelated blob).
+ */
+ @Test
+ public void testBlobNoJobFetchWithTooManyFailures() throws IOException {
+ final Configuration config = new Configuration();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+ temporaryFolder.newFolder().getAbsolutePath());
+
+ testBlobFetchWithTooManyFailures(config, new VoidBlobStore(), null);
+ }
+
+ /**
+ * A test where the connection fails too often and eventually fails the GET request (job-related
+ * blob).
*/
@Test
- public void testBlobFetchWithTooManyFailures() throws IOException {
+ public void testBlobForJobFetchWithTooManyFailures() throws IOException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
- testBlobFetchWithTooManyFailures(config, new VoidBlobStore());
+ testBlobFetchWithTooManyFailures(config, new VoidBlobStore(), new JobID());
+ }
+
+ /**
+ * A test where the connection fails twice and then the get operation succeeds
+ * (with high availability set, job-unrelated blob).
+ */
+ @Test
+ public void testBlobNoJobFetchWithTooManyFailuresHa() throws IOException {
+ testBlobFetchWithTooManyFailuresHa(null);
}
/**
* A test where the connection fails twice and then the get operation succeeds
- * (with high availability set).
+ * (with high availability set, job-related blob).
*/
@Test
- public void testBlobFetchWithTooManyFailuresHa() throws IOException {
+ public void testBlobForJobFetchWithTooManyFailuresHa() throws IOException {
+ testBlobFetchWithTooManyFailuresHa(new JobID());
+ }
+
+ private void testBlobFetchWithTooManyFailuresHa(final JobID jobId) throws IOException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
@@ -165,7 +220,7 @@ public class BlobCacheRetriesTest {
try {
blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
- testBlobFetchWithTooManyFailures(config, blobStoreService);
+ testBlobFetchWithTooManyFailures(config, blobStoreService, jobId);
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
@@ -181,7 +236,9 @@ public class BlobCacheRetriesTest {
* configuration to use (the BlobCache will get some additional settings
* set compared to this one)
*/
- private void testBlobFetchWithTooManyFailures(final Configuration config, final BlobStore blobStore) throws IOException {
+ private static void testBlobFetchWithTooManyFailures(
+ final Configuration config, final BlobStore blobStore, final JobID jobId)
+ throws IOException {
final byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
BlobServer server = null;
@@ -199,7 +256,7 @@ public class BlobCacheRetriesTest {
try {
blobClient = new BlobClient(serverAddress, config);
- key = blobClient.put(data);
+ key = blobClient.put(jobId, data);
}
finally {
if (blobClient != null) {
@@ -211,7 +268,11 @@ public class BlobCacheRetriesTest {
// trigger a download - it should fail eventually
try {
- cache.getFile(key);
+ if (jobId == null) {
+ cache.getFile(key);
+ } else {
+ cache.getFile(jobId, key);
+ }
fail("This should fail");
}
catch (IOException e) {