You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/05 14:07:05 UTC
[12/14] flink git commit: [FLINK-7068][blob] Introduce permanent and
transient BLOB keys
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
index 9e2d165..d6fab50 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
@@ -55,7 +55,7 @@ public class BlobCacheCleanupTest extends TestLogger {
public void testJobCleanup() throws IOException, InterruptedException {
JobID jobId = new JobID();
- List<BlobKey> keys = new ArrayList<>();
+ List<PermanentBlobKey> keys = new ArrayList<>();
BlobServer server = null;
PermanentBlobCache cache = null;
@@ -73,9 +73,9 @@ public class BlobCacheCleanupTest extends TestLogger {
cache = new PermanentBlobCache(serverAddress, config, new VoidBlobStore());
// upload blobs
- keys.add(server.putHA(jobId, buf));
+ keys.add(server.putPermanent(jobId, buf));
buf[0] += 1;
- keys.add(server.putHA(jobId, buf));
+ keys.add(server.putPermanent(jobId, buf));
checkFileCountForJob(2, jobId, server);
checkFileCountForJob(0, jobId, cache);
@@ -86,14 +86,14 @@ public class BlobCacheCleanupTest extends TestLogger {
checkFileCountForJob(2, jobId, server);
checkFileCountForJob(0, jobId, cache);
- for (BlobKey key : keys) {
- cache.getHAFile(jobId, key);
+ for (PermanentBlobKey key : keys) {
+ cache.getFile(jobId, key);
}
// register again (let's say, from another thread or so)
cache.registerJob(jobId);
- for (BlobKey key : keys) {
- cache.getHAFile(jobId, key);
+ for (PermanentBlobKey key : keys) {
+ cache.getFile(jobId, key);
}
assertEquals(2, checkFilesExist(jobId, keys, cache, true));
@@ -109,21 +109,7 @@ public class BlobCacheCleanupTest extends TestLogger {
// after releasing the second time, the job is up for deferred cleanup
cache.releaseJob(jobId);
-
- // because we cannot guarantee that there are not thread races in the build system, we
- // loop for a certain while until the references disappear
- {
- long deadline = System.currentTimeMillis() + 30_000L;
- do {
- Thread.sleep(100);
- }
- while (checkFilesExist(jobId, keys, cache, false) != 0 &&
- System.currentTimeMillis() < deadline);
- }
-
- // the blob cache should no longer contain the files
- // this fails if we exited via a timeout
- checkFileCountForJob(0, jobId, cache);
+ verifyJobCleanup(cache, jobId, keys);
// server should be unaffected
checkFileCountForJob(2, jobId, server);
}
@@ -141,8 +127,8 @@ public class BlobCacheCleanupTest extends TestLogger {
}
/**
- * Tests that {@link BlobCache} sets the expected reference counts and cleanup timeouts when
- * registering, releasing, and re-registering jobs.
+ * Tests that {@link PermanentBlobCache} sets the expected reference counts and cleanup timeouts
+ * when registering, releasing, and re-registering jobs.
*/
@Test
public void testJobReferences() throws IOException, InterruptedException {
@@ -208,7 +194,7 @@ public class BlobCacheCleanupTest extends TestLogger {
long cleanupInterval = 5L;
JobID jobId = new JobID();
- List<BlobKey> keys = new ArrayList<BlobKey>();
+ List<PermanentBlobKey> keys = new ArrayList<>();
BlobServer server = null;
PermanentBlobCache cache = null;
@@ -226,9 +212,9 @@ public class BlobCacheCleanupTest extends TestLogger {
cache = new PermanentBlobCache(serverAddress, config, new VoidBlobStore());
// upload blobs
- keys.add(server.putHA(jobId, buf));
+ keys.add(server.putPermanent(jobId, buf));
buf[0] += 1;
- keys.add(server.putHA(jobId, buf));
+ keys.add(server.putPermanent(jobId, buf));
checkFileCountForJob(2, jobId, server);
checkFileCountForJob(0, jobId, cache);
@@ -239,14 +225,14 @@ public class BlobCacheCleanupTest extends TestLogger {
checkFileCountForJob(2, jobId, server);
checkFileCountForJob(0, jobId, cache);
- for (BlobKey key : keys) {
- cache.getHAFile(jobId, key);
+ for (PermanentBlobKey key : keys) {
+ cache.getFile(jobId, key);
}
// register again (let's say, from another thread or so)
cache.registerJob(jobId);
- for (BlobKey key : keys) {
- cache.getHAFile(jobId, key);
+ for (PermanentBlobKey key : keys) {
+ cache.getFile(jobId, key);
}
assertEquals(2, checkFilesExist(jobId, keys, cache, true));
@@ -275,20 +261,7 @@ public class BlobCacheCleanupTest extends TestLogger {
Thread.sleep((cleanupInterval * 4) / 5);
// files are up for cleanup now...wait for it:
- // because we cannot guarantee that there are not thread races in the build system, we
- // loop for a certain while until the references disappear
- {
- long deadline = System.currentTimeMillis() + 30_000L;
- do {
- Thread.sleep(100);
- }
- while (checkFilesExist(jobId, keys, cache, false) != 0 &&
- System.currentTimeMillis() < deadline);
- }
-
- // the blob cache should no longer contain the files
- // this fails if we exited via a timeout
- checkFileCountForJob(0, jobId, cache);
+ verifyJobCleanup(cache, jobId, keys);
// server should be unaffected
checkFileCountForJob(2, jobId, server);
}
@@ -306,6 +279,36 @@ public class BlobCacheCleanupTest extends TestLogger {
}
/**
+ * Checks that BLOBs for the given <tt>jobId</tt> are cleaned up eventually (after calling
+ * {@link PermanentBlobCache#releaseJob(JobID)}, which is not done by this method!) (waits at
+ * most 30s).
+ *
+ * @param cache
+ * BLOB server
+ * @param jobId
+ * job ID or <tt>null</tt> if job-unrelated
+ * @param keys
+ * keys identifying BLOBs which were previously registered for the <tt>jobId</tt>
+ */
+ static void verifyJobCleanup(PermanentBlobCache cache, JobID jobId, List<? extends BlobKey> keys)
+ throws InterruptedException, IOException {
+ // because we cannot guarantee that there are not thread races in the build system, we
+ // loop for a certain while until the references disappear
+ {
+ long deadline = System.currentTimeMillis() + 30_000L;
+ do {
+ Thread.sleep(100);
+ }
+ while (checkFilesExist(jobId, keys, cache, false) != 0 &&
+ System.currentTimeMillis() < deadline);
+ }
+
+ // the blob cache should no longer contain the files
+ // this fails if we exited via a timeout
+ checkFileCountForJob(0, jobId, cache);
+ }
+
+ /**
* Checks how many of the files given by blob keys are accessible.
*
* @param jobId
@@ -317,11 +320,12 @@ public class BlobCacheCleanupTest extends TestLogger {
* @param doThrow
* whether exceptions should be ignored (<tt>false</tt>), or thrown (<tt>true</tt>)
*
- * @return number of files we were able to retrieve via {@link PermanentBlobService#getHAFile}
+ * @return number of files existing at {@link BlobServer#getStorageLocation(JobID, BlobKey)} and
+ * {@link PermanentBlobCache#getStorageLocation(JobID, BlobKey)}, respectively
*/
public static int checkFilesExist(
- JobID jobId, Collection<BlobKey> keys, PermanentBlobService blobService, boolean doThrow)
- throws IOException {
+ JobID jobId, Collection<? extends BlobKey> keys, PermanentBlobService blobService, boolean doThrow)
+ throws IOException {
int numFiles = 0;
@@ -361,10 +365,10 @@ public class BlobCacheCleanupTest extends TestLogger {
final File jobDir;
if (blobService instanceof BlobServer) {
BlobServer server = (BlobServer) blobService;
- jobDir = server.getStorageLocation(jobId, new BlobKey()).getParentFile();
+ jobDir = server.getStorageLocation(jobId, new PermanentBlobKey()).getParentFile();
} else {
PermanentBlobCache cache = (PermanentBlobCache) blobService;
- jobDir = cache.getStorageLocation(jobId, new BlobKey()).getParentFile();
+ jobDir = cache.getStorageLocation(jobId, new PermanentBlobKey()).getParentFile();
}
File[] blobsForJob = jobDir.listFiles();
if (blobsForJob == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java
index 0a2882d..a0e46ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.util.TestLogger;
@@ -32,6 +31,8 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -39,6 +40,8 @@ import java.nio.file.Files;
import java.util.Arrays;
import java.util.Random;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
import static org.hamcrest.CoreMatchers.containsString;
@@ -48,7 +51,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
- * Tests how GET requests react to corrupt files when downloaded via a {@link BlobCache}.
+ * Tests how GET requests react to corrupt files when downloaded via a {@link BlobCacheService}.
*
* <p>Successful GET requests are tested in conjunction wit the PUT requests.
*/
@@ -62,22 +65,22 @@ public class BlobCacheCorruptionTest extends TestLogger {
@Test
public void testGetFailsFromCorruptFile1() throws IOException {
- testGetFailsFromCorruptFile(null, false, false);
+ testGetFailsFromCorruptFile(null, TRANSIENT_BLOB, false);
}
@Test
public void testGetFailsFromCorruptFile2() throws IOException {
- testGetFailsFromCorruptFile(new JobID(), false, false);
+ testGetFailsFromCorruptFile(new JobID(), TRANSIENT_BLOB, false);
}
@Test
public void testGetFailsFromCorruptFile3() throws IOException {
- testGetFailsFromCorruptFile(new JobID(), true, false);
+ testGetFailsFromCorruptFile(new JobID(), PERMANENT_BLOB, false);
}
@Test
public void testGetFailsFromCorruptFile4() throws IOException {
- testGetFailsFromCorruptFile(new JobID(), true, true);
+ testGetFailsFromCorruptFile(new JobID(), PERMANENT_BLOB, true);
}
/**
@@ -86,19 +89,18 @@ public class BlobCacheCorruptionTest extends TestLogger {
*
* @param jobId
* job ID or <tt>null</tt> if job-unrelated
- * @param highAvailability
- * whether to use HA mode accessors
+ * @param blobType
+ * whether the BLOB should become permanent or transient
* @param corruptOnHAStore
* whether the file should be corrupt in the HA store (<tt>true</tt>, required
* <tt>highAvailability</tt> to be set) or on the {@link BlobServer}'s local store
* (<tt>false</tt>)
*/
- private void testGetFailsFromCorruptFile(final JobID jobId, boolean highAvailability,
+ private void testGetFailsFromCorruptFile(final JobID jobId, BlobKey.BlobType blobType,
boolean corruptOnHAStore) throws IOException {
final Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
- config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
@@ -107,7 +109,7 @@ public class BlobCacheCorruptionTest extends TestLogger {
try {
blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
- testGetFailsFromCorruptFile(jobId, highAvailability, corruptOnHAStore, config,
+ testGetFailsFromCorruptFile(jobId, blobType, corruptOnHAStore, config,
blobStoreService, exception);
} finally {
if (blobStoreService != null) {
@@ -117,13 +119,36 @@ public class BlobCacheCorruptionTest extends TestLogger {
}
/**
+ * Checks the GET operation fails when the downloaded file (from HA store)
+ * is corrupt, i.e. its content's hash does not match the {@link BlobKey}'s hash, using a
+ * permanent BLOB.
+ *
+ * @param jobId
+ * job ID
+ * @param config
+ * blob server configuration (including HA settings like {@link HighAvailabilityOptions#HA_STORAGE_PATH}
+ * and {@link HighAvailabilityOptions#HA_CLUSTER_ID}) used to set up <tt>blobStore</tt>
+ * @param blobStore
+ * shared HA blob store to use
+ * @param expectedException
+ * expected exception rule to use
+ */
+ public static void testGetFailsFromCorruptFile(
+ JobID jobId, Configuration config, BlobStore blobStore,
+ ExpectedException expectedException) throws IOException {
+
+ testGetFailsFromCorruptFile(jobId, PERMANENT_BLOB, true, config, blobStore,
+ expectedException);
+ }
+
+ /**
* Checks the GET operation fails when the downloaded file (from {@link BlobServer} or HA store)
* is corrupt, i.e. its content's hash does not match the {@link BlobKey}'s hash.
*
* @param jobId
* job ID or <tt>null</tt> if job-unrelated
- * @param highAvailability
- * whether to use HA mode accessors
+ * @param blobType
+ * whether the BLOB should become permanent or transient
* @param corruptOnHAStore
* whether the file should be corrupt in the HA store (<tt>true</tt>, required
* <tt>highAvailability</tt> to be set) or on the {@link BlobServer}'s local store
@@ -136,17 +161,19 @@ public class BlobCacheCorruptionTest extends TestLogger {
* @param expectedException
* expected exception rule to use
*/
- public static void testGetFailsFromCorruptFile(
- JobID jobId, boolean highAvailability, boolean corruptOnHAStore, Configuration config,
- BlobStore blobStore, ExpectedException expectedException) throws IOException {
+ private static void testGetFailsFromCorruptFile(
+ @Nullable JobID jobId, BlobKey.BlobType blobType, boolean corruptOnHAStore,
+ Configuration config, BlobStore blobStore, ExpectedException expectedException)
+ throws IOException {
- assertTrue("corrupt HA file requires a HA setup", !corruptOnHAStore || highAvailability);
+ assertTrue("corrupt HA file requires a HA setup",
+ !corruptOnHAStore || blobType == PERMANENT_BLOB);
Random rnd = new Random();
try (
BlobServer server = new BlobServer(config, blobStore);
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, corruptOnHAStore ? blobStore : new VoidBlobStore())) {
server.start();
@@ -155,7 +182,7 @@ public class BlobCacheCorruptionTest extends TestLogger {
rnd.nextBytes(data);
// put content addressable (like libraries)
- BlobKey key = put(server, jobId, data, highAvailability);
+ BlobKey key = put(server, jobId, data, blobType);
assertNotNull(key);
// change server/HA store file contents to make sure that GET requests fail
@@ -186,7 +213,7 @@ public class BlobCacheCorruptionTest extends TestLogger {
expectedException.expectCause(CoreMatchers.allOf(instanceOf(IOException.class),
hasProperty("message", containsString("data corruption"))));
- get(cache, jobId, key, highAvailability);
+ get(cache, jobId, key);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
index 20b0152..8ea2a5e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
@@ -45,6 +45,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
import static org.apache.flink.runtime.blob.BlobServerDeleteTest.delete;
import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
@@ -67,42 +68,43 @@ public class BlobCacheDeleteTest extends TestLogger {
@Test
public void testDeleteTransient1() throws IOException {
- testDeleteTransient(null, new JobID());
+ testDelete(null, new JobID());
}
@Test
public void testDeleteTransient2() throws IOException {
- testDeleteTransient(new JobID(), null);
+ testDelete(new JobID(), null);
}
@Test
public void testDeleteTransient3() throws IOException {
- testDeleteTransient(null, null);
+ testDelete(null, null);
}
@Test
public void testDeleteTransient4() throws IOException {
- testDeleteTransient(new JobID(), new JobID());
+ testDelete(new JobID(), new JobID());
}
/**
* Uploads a (different) byte array for each of the given jobs and verifies that deleting one of
- * them (via the {@link BlobCache}) does not influence the other.
+ * them (via the {@link BlobCacheService}) does not influence the other.
*
- * @param jobId1
+ * @param jobId1
* first job id
* @param jobId2
* second job id
*/
- private void testDeleteTransient(@Nullable JobID jobId1, @Nullable JobID jobId2)
+ private void testDelete(@Nullable JobID jobId1, @Nullable JobID jobId2)
throws IOException {
+ final boolean sameJobId = (jobId1 == jobId2) || (jobId1 != null && jobId1.equals(jobId2));
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
try (
BlobServer server = new BlobServer(config, new VoidBlobStore());
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, new VoidBlobStore())) {
server.start();
@@ -113,38 +115,46 @@ public class BlobCacheDeleteTest extends TestLogger {
data2[0] ^= 1;
// put first BLOB
- BlobKey key1 = put(server, jobId1, data, false);
+ TransientBlobKey key1 = (TransientBlobKey) put(server, jobId1, data, TRANSIENT_BLOB);
assertNotNull(key1);
// put two more BLOBs (same key, other key) for another job ID
- BlobKey key2a = put(server, jobId2, data, false);
+ TransientBlobKey key2a = (TransientBlobKey) put(server, jobId2, data, TRANSIENT_BLOB);
assertNotNull(key2a);
assertEquals(key1, key2a);
- BlobKey key2b = put(server, jobId2, data2, false);
+ TransientBlobKey key2b = (TransientBlobKey) put(server, jobId2, data2, TRANSIENT_BLOB);
assertNotNull(key2b);
// issue a DELETE request
assertTrue(delete(cache, jobId1, key1));
- verifyDeleted(cache, jobId1, key1, false);
- verifyDeleted(server, jobId1, key1, false);
+ // delete only works on local cache!
+ assertTrue(server.getStorageLocation(jobId1, key1).exists());
+ // delete on server so that the cache cannot re-download
+ assertTrue(server.deleteInternal(jobId1, key1));
+ verifyDeleted(cache, jobId1, key1);
// deleting a one BLOB should not affect another BLOB, even with the same key if job IDs are different
- if ((jobId1 == null && jobId2 != null) || (jobId1 != null && !jobId1.equals(jobId2))) {
- verifyContents(server, jobId2, key2a, data, false);
+ if (!sameJobId) {
+ verifyContents(server, jobId2, key2a, data);
}
- verifyContents(server, jobId2, key2b, data2, false);
+ verifyContents(server, jobId2, key2b, data2);
// delete first file of second job
assertTrue(delete(cache, jobId2, key2a));
- verifyDeleted(cache, jobId2, key2a, false);
- verifyDeleted(server, jobId2, key2a, false);
- verifyContents(server, jobId2, key2b, data2, false);
+ // delete only works on local cache (unless already deleted - key1 == key2a)!
+ assertTrue(sameJobId || server.getStorageLocation(jobId2, key2a).exists());
+ // delete on server so that the cache cannot re-download
+ assertTrue(server.deleteInternal(jobId2, key2a));
+ verifyDeleted(cache, jobId2, key2a);
+ verifyContents(server, jobId2, key2b, data2);
// delete second file of second job
- assertTrue(delete(cache, jobId2, key2a));
- verifyDeleted(cache, jobId2, key2a, false);
- verifyDeleted(server, jobId2, key2a, false);
- verifyContents(server, jobId2, key2b, data2, false);
+ assertTrue(delete(cache, jobId2, key2b));
+ // delete only works on local cache (unless already deleted - key1 == key2a)!
+ assertTrue(server.getStorageLocation(jobId2, key2b).exists());
+ // delete on server so that the cache cannot re-download
+ assertTrue(server.deleteInternal(jobId2, key2b));
+ verifyDeleted(cache, jobId2, key2b);
}
}
@@ -160,7 +170,7 @@ public class BlobCacheDeleteTest extends TestLogger {
/**
* Uploads a byte array for the given job and verifies that deleting it (via the {@link
- * BlobCache}) does not fail independent of whether the file exists.
+ * BlobCacheService}) does not fail independent of whether the file exists.
*
* @param jobId
* job id
@@ -172,7 +182,7 @@ public class BlobCacheDeleteTest extends TestLogger {
try (
BlobServer server = new BlobServer(config, new VoidBlobStore());
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, new VoidBlobStore())) {
server.start();
@@ -181,7 +191,7 @@ public class BlobCacheDeleteTest extends TestLogger {
rnd.nextBytes(data);
// put BLOB
- BlobKey key = put(server, jobId, data, false);
+ TransientBlobKey key = (TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB);
assertNotNull(key);
File blobFile = server.getStorageLocation(jobId, key);
@@ -189,11 +199,11 @@ public class BlobCacheDeleteTest extends TestLogger {
// DELETE operation should not fail if file is already deleted
assertTrue(delete(cache, jobId, key));
- verifyDeleted(cache, jobId, key, false);
+ verifyDeleted(cache, jobId, key);
// one more delete call that should not fail
assertTrue(delete(cache, jobId, key));
- verifyDeleted(cache, jobId, key, false);
+ verifyDeleted(cache, jobId, key);
}
}
@@ -209,8 +219,8 @@ public class BlobCacheDeleteTest extends TestLogger {
/**
* Uploads a byte array for the given job and verifies that a delete operation (via the {@link
- * BlobCache}) does not fail even if the file is not deletable locally, e.g. via restricting
- * the permissions.
+ * BlobCacheService}) does not fail even if the file is not deletable locally, e.g. via
+ * restricting the permissions.
*
* @param jobId
* job id
@@ -225,7 +235,7 @@ public class BlobCacheDeleteTest extends TestLogger {
File directory = null;
try (
BlobServer server = new BlobServer(config, new VoidBlobStore());
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, new VoidBlobStore())) {
server.start();
@@ -235,13 +245,13 @@ public class BlobCacheDeleteTest extends TestLogger {
rnd.nextBytes(data);
// put BLOB
- BlobKey key = put(server, jobId, data, false);
+ TransientBlobKey key = (TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB);
assertNotNull(key);
// access from cache once to have it available there
- verifyContents(cache, jobId, key, data, false);
+ verifyContents(cache, jobId, key, data);
- blobFile = cache.getTransientBlobStore().getStorageLocation(jobId, key);
+ blobFile = cache.getTransientBlobService().getStorageLocation(jobId, key);
directory = blobFile.getParentFile();
assertTrue(blobFile.setWritable(false, false));
@@ -249,10 +259,10 @@ public class BlobCacheDeleteTest extends TestLogger {
// issue a DELETE request
assertFalse(delete(cache, jobId, key));
- verifyDeleted(server, jobId, key, false);
+ verifyDeleted(server, jobId, key);
// the file should still be there on both cache and server
- verifyContents(cache, jobId, key, data, false);
+ verifyContents(cache, jobId, key, data);
} finally {
if (blobFile != null && directory != null) {
//noinspection ResultOfMethodCallIgnored
@@ -265,83 +275,14 @@ public class BlobCacheDeleteTest extends TestLogger {
}
@Test
- public void testDeleteTransientRemoteFailsNoJob() throws IOException {
- testDeleteTransientRemoteFails(null);
- }
-
- @Test
- public void testDeleteTransientRemoteFailsForJob() throws IOException {
- testDeleteTransientRemoteFails(new JobID());
- }
-
- /**
- * Uploads a byte array for the given job and verifies that a delete operation (via the {@link
- * BlobCache}) does not fail even if the file is not deletable on the {@link BlobServer}, e.g.
- * via restricting the permissions.
- *
- * @param jobId
- * job id
- */
- private void testDeleteTransientRemoteFails(@Nullable final JobID jobId) throws IOException {
- assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
-
- final Configuration config = new Configuration();
- config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
-
- File blobFile = null;
- File directory = null;
-
- try (
- BlobServer server = new BlobServer(config, new VoidBlobStore());
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
- config, new VoidBlobStore())) {
-
- server.start();
-
- try {
- byte[] data = new byte[2000000];
- rnd.nextBytes(data);
-
- // put BLOB
- BlobKey key = put(server, jobId, data, false);
- assertNotNull(key);
-
- // access from cache once to have it available there
- verifyContents(cache, jobId, key, data, false);
-
- blobFile = server.getStorageLocation(jobId, key);
- directory = blobFile.getParentFile();
-
- assertTrue(blobFile.setWritable(false, false));
- assertTrue(directory.setWritable(false, false));
-
- // issue a DELETE request
- assertFalse(delete(cache, jobId, key));
- File blobFileAtCache = cache.getTransientBlobStore().getStorageLocation(jobId, key);
- assertFalse(blobFileAtCache.exists());
-
- // the file should still be there on the server
- verifyContents(server, jobId, key, data, false);
- // ... and may be retrieved by the cache
- verifyContents(cache, jobId, key, data, false);
- } finally {
- if (blobFile != null && directory != null) {
- //noinspection ResultOfMethodCallIgnored
- blobFile.setWritable(true, false);
- //noinspection ResultOfMethodCallIgnored
- directory.setWritable(true, false);
- }
- }
- }
- }
-
- @Test
- public void testConcurrentDeleteOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
+ public void testConcurrentDeleteOperationsNoJobTransient()
+ throws IOException, ExecutionException, InterruptedException {
testConcurrentDeleteOperations(null);
}
@Test
- public void testConcurrentDeleteOperationsForJob() throws IOException, ExecutionException, InterruptedException {
+ public void testConcurrentDeleteOperationsForJobTransient()
+ throws IOException, ExecutionException, InterruptedException {
testConcurrentDeleteOperations(new JobID());
}
@@ -352,9 +293,9 @@ public class BlobCacheDeleteTest extends TestLogger {
* blob file exist and then one of them fails deleting it. Without the introduced lock, this
* situation should rarely happen and make this test fail. Thus, if this test should become
* "unstable", then the delete atomicity is most likely broken.
- *
- * @param jobId
+ * @param jobId
* job ID to use (or <tt>null</tt> if job-unrelated)
+ *
*/
private void testConcurrentDeleteOperations(@Nullable final JobID jobId)
throws IOException, InterruptedException, ExecutionException {
@@ -371,12 +312,13 @@ public class BlobCacheDeleteTest extends TestLogger {
try (
BlobServer server = new BlobServer(config, new VoidBlobStore());
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, new VoidBlobStore())) {
server.start();
- final BlobKey blobKey = put(server, jobId, data, false);
+ final TransientBlobKey blobKey =
+ (TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB);
assertTrue(server.getStorageLocation(jobId, blobKey).exists());
@@ -386,8 +328,9 @@ public class BlobCacheDeleteTest extends TestLogger {
() -> {
try {
assertTrue(delete(cache, jobId, blobKey));
- assertFalse(cache.getTransientBlobStore().getStorageLocation(jobId, blobKey).exists());
- assertFalse(server.getStorageLocation(jobId, blobKey).exists());
+ assertFalse(cache.getTransientBlobService().getStorageLocation(jobId, blobKey).exists());
+ // delete only works on local cache!
+ assertTrue(server.getStorageLocation(jobId, blobKey).exists());
return null;
} catch (IOException e) {
throw new CompletionException(new FlinkException(
@@ -404,7 +347,8 @@ public class BlobCacheDeleteTest extends TestLogger {
// in case of no lock, one of the delete operations should eventually fail
waitFuture.get();
- assertFalse(server.getStorageLocation(jobId, blobKey).exists());
+ // delete only works on local cache!
+ assertTrue(server.getStorageLocation(jobId, blobKey).exists());
} finally {
executor.shutdownNow();
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
index 52adc4d..bed27d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -35,6 +36,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.AccessDeniedException;
@@ -51,24 +53,37 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import static org.apache.flink.runtime.blob.BlobCachePutTest.verifyDeletedEventually;
import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyType;
+import static org.apache.flink.runtime.blob.BlobServerDeleteTest.delete;
import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
+import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
import static org.apache.flink.runtime.blob.BlobUtils.JOB_DIR_PREFIX;
import static org.apache.flink.runtime.blob.BlobUtils.NO_JOB_DIR_PREFIX;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.mock;
/**
- * Tests how failing GET requests behave in the presence of failures when used with a {@link
- * BlobCache}.
+ * Tests for GET-specific parts of the {@link BlobCacheService}.
*
- * <p>Successful GET requests are tested in conjunction wit the PUT requests.
+ * This includes access of transient BLOBs from the {@link PermanentBlobCache}, permanent BLOBS from
+ * the {@link TransientBlobCache}, and how failing GET requests behave in the presence of failures
+ * when used with a {@link BlobCacheService}.
+ *
+ * <p>Most successful GET requests are tested in conjunction wit the PUT requests by {@link
+ * BlobCachePutTest}.
*/
public class BlobCacheGetTest extends TestLogger {
@@ -81,23 +96,23 @@ public class BlobCacheGetTest extends TestLogger {
public final ExpectedException exception = ExpectedException.none();
@Test
- public void testGetFailsDuringLookup1() throws IOException {
- testGetFailsDuringLookup(null, new JobID(), false);
+ public void testGetTransientFailsDuringLookup1() throws IOException, InterruptedException {
+ testGetFailsDuringLookup(null, new JobID(), TRANSIENT_BLOB);
}
@Test
- public void testGetFailsDuringLookup2() throws IOException {
- testGetFailsDuringLookup(new JobID(), new JobID(), false);
+ public void testGetTransientFailsDuringLookup2() throws IOException, InterruptedException {
+ testGetFailsDuringLookup(new JobID(), new JobID(), TRANSIENT_BLOB);
}
@Test
- public void testGetFailsDuringLookup3() throws IOException {
- testGetFailsDuringLookup(new JobID(), null, false);
+ public void testGetTransientFailsDuringLookup3() throws IOException, InterruptedException {
+ testGetFailsDuringLookup(new JobID(), null, TRANSIENT_BLOB);
}
@Test
- public void testGetFailsDuringLookupHa() throws IOException {
- testGetFailsDuringLookup(new JobID(), new JobID(), true);
+ public void testGetFailsDuringLookupHa() throws IOException, InterruptedException {
+ testGetFailsDuringLookup(new JobID(), new JobID(), PERMANENT_BLOB);
}
/**
@@ -105,15 +120,17 @@ public class BlobCacheGetTest extends TestLogger {
*
* @param jobId1 first job ID or <tt>null</tt> if job-unrelated
* @param jobId2 second job ID different to <tt>jobId1</tt>
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
- private void testGetFailsDuringLookup(final JobID jobId1, final JobID jobId2, boolean highAvailability)
- throws IOException {
+ private void testGetFailsDuringLookup(final JobID jobId1, final JobID jobId2, BlobKey.BlobType blobType)
+ throws IOException, InterruptedException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
try (
BlobServer server = new BlobServer(config, new VoidBlobStore());
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, new VoidBlobStore())) {
server.start();
@@ -122,75 +139,79 @@ public class BlobCacheGetTest extends TestLogger {
rnd.nextBytes(data);
// put content addressable (like libraries)
- BlobKey key = put(server, jobId1, data, highAvailability);
+ BlobKey key = put(server, jobId1, data, blobType);
assertNotNull(key);
+ verifyType(blobType, key);
// delete file to make sure that GET requests fail
File blobFile = server.getStorageLocation(jobId1, key);
assertTrue(blobFile.delete());
// issue a GET request that fails
- verifyDeleted(cache, jobId1, key, highAvailability);
+ verifyDeleted(cache, jobId1, key);
// add the same data under a second jobId
- BlobKey key2 = put(server, jobId2, data, highAvailability);
+ BlobKey key2 = put(server, jobId2, data, blobType);
assertNotNull(key);
assertEquals(key, key2);
// request for jobId2 should succeed
- get(cache, jobId2, key, highAvailability);
+ get(cache, jobId2, key);
// request for jobId1 should still fail
- verifyDeleted(cache, jobId1, key, highAvailability);
-
- // delete on cache, try to retrieve again
- if (highAvailability) {
- blobFile = cache.getPermanentBlobStore().getStorageLocation(jobId2, key);
- } else {
- blobFile = cache.getTransientBlobStore().getStorageLocation(jobId2, key);
- }
- assertTrue(blobFile.delete());
- get(cache, jobId2, key, highAvailability);
-
- // delete on cache and server, verify that it is not accessible anymore
- if (highAvailability) {
- blobFile = cache.getPermanentBlobStore().getStorageLocation(jobId2, key);
+ verifyDeleted(cache, jobId1, key);
+
+ if (blobType == PERMANENT_BLOB) {
+ // still existing on server
+ assertTrue(server.getStorageLocation(jobId2, key).exists());
+ // delete jobId2 on cache
+ blobFile = cache.getPermanentBlobService().getStorageLocation(jobId2, key);
+ assertTrue(blobFile.delete());
+ // try to retrieve again
+ get(cache, jobId2, key);
+
+ // delete on cache and server, verify that it is not accessible anymore
+ blobFile = cache.getPermanentBlobService().getStorageLocation(jobId2, key);
+ assertTrue(blobFile.delete());
+ blobFile = server.getStorageLocation(jobId2, key);
+ assertTrue(blobFile.delete());
+ verifyDeleted(cache, jobId2, key);
} else {
- blobFile = cache.getTransientBlobStore().getStorageLocation(jobId2, key);
+ // deleted eventually on the server by the GET request above
+ verifyDeletedEventually(server, jobId2, key);
+ // delete jobId2 on cache
+ blobFile = cache.getTransientBlobService().getStorageLocation(jobId2, key);
+ assertTrue(blobFile.delete());
+ // verify that it is not accessible anymore
+ verifyDeleted(cache, jobId2, key);
}
- assertTrue(blobFile.delete());
- blobFile = server.getStorageLocation(jobId2, key);
- assertTrue(blobFile.delete());
- verifyDeleted(cache, jobId2, key, highAvailability);
}
}
@Test
public void testGetFailsIncomingNoJob() throws IOException {
- testGetFailsIncoming(null, false);
+ testGetFailsIncoming(null, TRANSIENT_BLOB);
}
@Test
public void testGetFailsIncomingForJob() throws IOException {
- testGetFailsIncoming(new JobID(), false);
+ testGetFailsIncoming(new JobID(), TRANSIENT_BLOB);
}
@Test
public void testGetFailsIncomingForJobHa() throws IOException {
- testGetFailsIncoming(new JobID(), true);
+ testGetFailsIncoming(new JobID(), PERMANENT_BLOB);
}
/**
- * Retrieves a BLOB via a {@link BlobCache} which cannot create incoming files. File transfers
- * should fail.
+ * Retrieves a BLOB via a {@link BlobCacheService} which cannot create incoming files. File
+ * transfers should fail.
*
* @param jobId
* job id
- * @param highAvailability
- * whether to retrieve a permanent blob (<tt>true</tt>, via {@link
- * PermanentBlobCache#getHAFile(JobID, BlobKey)}) or not (<tt>false</tt>, via {@link
- * TransientBlobCache#getFile(JobID, BlobKey)})
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
- private void testGetFailsIncoming(@Nullable final JobID jobId, boolean highAvailability)
+ private void testGetFailsIncoming(@Nullable final JobID jobId, BlobKey.BlobType blobType)
throws IOException {
assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
@@ -200,7 +221,7 @@ public class BlobCacheGetTest extends TestLogger {
File tempFileDir = null;
try (
BlobServer server = new BlobServer(config, new VoidBlobStore());
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, new VoidBlobStore())) {
server.start();
@@ -208,15 +229,16 @@ public class BlobCacheGetTest extends TestLogger {
// store the data on the server
byte[] data = new byte[2000000];
rnd.nextBytes(data);
- BlobKey blobKey = put(server, jobId, data, highAvailability);
+ BlobKey blobKey = put(server, jobId, data, blobType);
+ verifyType(blobType, blobKey);
// make sure the blob cache cannot create any files in its storage dir
- if (highAvailability) {
+ if (blobType == PERMANENT_BLOB) {
tempFileDir =
- cache.getPermanentBlobStore().createTemporaryFilename().getParentFile();
+ cache.getPermanentBlobService().createTemporaryFilename().getParentFile();
} else {
tempFileDir =
- cache.getTransientBlobStore().createTemporaryFilename().getParentFile();
+ cache.getTransientBlobService().createTemporaryFilename().getParentFile();
}
assertTrue(tempFileDir.setExecutable(true, false));
assertTrue(tempFileDir.setReadable(true, false));
@@ -227,7 +249,7 @@ public class BlobCacheGetTest extends TestLogger {
exception.expectMessage("Failed to fetch BLOB ");
try {
- get(cache, jobId, blobKey, highAvailability);
+ get(cache, jobId, blobKey);
} finally {
HashSet<String> expectedDirs = new HashSet<>();
expectedDirs.add("incoming");
@@ -254,6 +276,9 @@ public class BlobCacheGetTest extends TestLogger {
File noJobDir = new File(tempFileDir.getParentFile(), NO_JOB_DIR_PREFIX);
assertArrayEquals(new String[] {}, noJobDir.list());
}
+
+ // file should still be there on the server (even if transient)
+ assertTrue(server.getStorageLocation(jobId, blobKey).exists());
}
} finally {
// set writable again to make sure we can remove the directory
@@ -265,33 +290,31 @@ public class BlobCacheGetTest extends TestLogger {
}
@Test
- public void testGetFailsStoreNoJob() throws IOException {
- testGetFailsStore(null, false);
+ public void testGetTransientFailsStoreNoJob() throws IOException, InterruptedException {
+ testGetFailsStore(null, TRANSIENT_BLOB);
}
@Test
- public void testGetFailsStoreForJob() throws IOException {
- testGetFailsStore(new JobID(), false);
+ public void testGetTransientFailsStoreForJob() throws IOException, InterruptedException {
+ testGetFailsStore(new JobID(), TRANSIENT_BLOB);
}
@Test
- public void testGetFailsStoreForJobHa() throws IOException {
- testGetFailsStore(new JobID(), true);
+ public void testGetPermanentFailsStoreForJob() throws IOException, InterruptedException {
+ testGetFailsStore(new JobID(), PERMANENT_BLOB);
}
/**
- * Retrieves a BLOB via a {@link BlobCache} which cannot create the final storage file. File
- * transfers should fail.
+ * Retrieves a BLOB via a {@link BlobCacheService} which cannot create the final storage file.
+ * File transfers should fail.
*
* @param jobId
* job id
- * @param highAvailability
- * whether to retrieve a permanent blob (<tt>true</tt>, via {@link
- * PermanentBlobCache#getHAFile(JobID, BlobKey)}) or not (<tt>false</tt>, via {@link
- * TransientBlobCache#getFile(JobID, BlobKey)})
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
- private void testGetFailsStore(@Nullable final JobID jobId, boolean highAvailability)
- throws IOException {
+ private void testGetFailsStore(@Nullable final JobID jobId, BlobKey.BlobType blobType)
+ throws IOException, InterruptedException {
assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
final Configuration config = new Configuration();
@@ -300,7 +323,7 @@ public class BlobCacheGetTest extends TestLogger {
File jobStoreDir = null;
try (
BlobServer server = new BlobServer(config, new VoidBlobStore());
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, new VoidBlobStore())) {
server.start();
@@ -308,14 +331,17 @@ public class BlobCacheGetTest extends TestLogger {
// store the data on the server
byte[] data = new byte[2000000];
rnd.nextBytes(data);
- BlobKey blobKey = put(server, jobId, data, highAvailability);
+ BlobKey blobKey = put(server, jobId, data, blobType);
+ verifyType(blobType, blobKey);
// make sure the blob cache cannot create any files in its storage dir
- if (highAvailability) {
- jobStoreDir = cache.getPermanentBlobStore().getStorageLocation(jobId, new BlobKey())
+ if (blobType == PERMANENT_BLOB) {
+ jobStoreDir = cache.getPermanentBlobService()
+ .getStorageLocation(jobId, new PermanentBlobKey())
.getParentFile();
} else {
- jobStoreDir = cache.getTransientBlobStore().getStorageLocation(jobId, new BlobKey())
+ jobStoreDir = cache.getTransientBlobService()
+ .getStorageLocation(jobId, new TransientBlobKey())
.getParentFile();
}
assertTrue(jobStoreDir.setExecutable(true, false));
@@ -326,7 +352,7 @@ public class BlobCacheGetTest extends TestLogger {
exception.expect(AccessDeniedException.class);
try {
- get(cache, jobId, blobKey, highAvailability);
+ get(cache, jobId, blobKey);
} finally {
// there should be no remaining incoming files
File incomingFileDir = new File(jobStoreDir.getParent(), "incoming");
@@ -334,6 +360,14 @@ public class BlobCacheGetTest extends TestLogger {
// there should be no files in the job directory
assertArrayEquals(new String[] {}, jobStoreDir.list());
+
+ // if transient, the get will fail but since the download was successful, the file
+ // will not be on the server anymore
+ if (blobType == TRANSIENT_BLOB) {
+ verifyDeletedEventually(server, jobId, blobKey);
+ } else {
+ assertTrue(server.getStorageLocation(jobId, blobKey).exists());
+ }
}
} finally {
// set writable again to make sure we can remove the directory
@@ -357,7 +391,7 @@ public class BlobCacheGetTest extends TestLogger {
try (
BlobServer server = new BlobServer(config, new VoidBlobStore());
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, new VoidBlobStore())) {
server.start();
@@ -365,7 +399,7 @@ public class BlobCacheGetTest extends TestLogger {
// store the data on the server (and blobStore), remove from local server store
byte[] data = new byte[2000000];
rnd.nextBytes(data);
- BlobKey blobKey = put(server, jobId, data, true);
+ PermanentBlobKey blobKey = (PermanentBlobKey) put(server, jobId, data, PERMANENT_BLOB);
assertTrue(server.getStorageLocation(jobId, blobKey).delete());
File tempFileDir = server.createTemporaryFilename().getParentFile();
@@ -375,7 +409,7 @@ public class BlobCacheGetTest extends TestLogger {
exception.expectMessage("Failed to fetch BLOB ");
try {
- get(cache, jobId, blobKey, true);
+ get(cache, jobId, blobKey);
} finally {
HashSet<String> expectedDirs = new HashSet<>();
expectedDirs.add("incoming");
@@ -393,6 +427,77 @@ public class BlobCacheGetTest extends TestLogger {
}
}
+ @Test
+ public void testGetTransientRemoteDeleteFailsNoJob() throws IOException {
+ testGetTransientRemoteDeleteFails(null);
+ }
+
+ @Test
+ public void testGetTransientRemoteDeleteFailsForJob() throws IOException {
+ testGetTransientRemoteDeleteFails(new JobID());
+ }
+
+ /**
+ * Uploads a byte array for the given job and verifies that a get operation of a transient BLOB
+ * (via the {@link BlobCacheService}; also deletes the file on the {@link BlobServer}) does not
+ * fail even if the file is not deletable on the {@link BlobServer}, e.g. via restricting the
+ * permissions.
+ *
+ * @param jobId
+ * job id
+ */
+ private void testGetTransientRemoteDeleteFails(@Nullable final JobID jobId) throws IOException {
+ assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
+
+ final Configuration config = new Configuration();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+
+ File blobFile = null;
+ File directory = null;
+
+ try (
+ BlobServer server = new BlobServer(config, new VoidBlobStore());
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
+ config, new VoidBlobStore())) {
+
+ server.start();
+
+ try {
+ byte[] data = new byte[2000000];
+ rnd.nextBytes(data);
+
+ // put BLOB
+ TransientBlobKey key = (TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB);
+ assertNotNull(key);
+
+ blobFile = server.getStorageLocation(jobId, key);
+ directory = blobFile.getParentFile();
+
+ assertTrue(blobFile.setWritable(false, false));
+ assertTrue(directory.setWritable(false, false));
+
+ // access from cache once which also deletes the file on the server
+ verifyContents(cache, jobId, key, data);
+ // delete locally (should not be affected by the server)
+ assertTrue(delete(cache, jobId, key));
+ File blobFileAtCache = cache.getTransientBlobService().getStorageLocation(jobId, key);
+ assertFalse(blobFileAtCache.exists());
+
+ // the file should still be there on the server
+ verifyContents(server, jobId, key, data);
+ // ... and may be retrieved by the cache
+ verifyContents(cache, jobId, key, data);
+ } finally {
+ if (blobFile != null && directory != null) {
+ //noinspection ResultOfMethodCallIgnored
+ blobFile.setWritable(true, false);
+ //noinspection ResultOfMethodCallIgnored
+ directory.setWritable(true, false);
+ }
+ }
+ }
+ }
+
/**
* FLINK-6020
*
@@ -400,22 +505,22 @@ public class BlobCacheGetTest extends TestLogger {
*/
@Test
public void testConcurrentGetOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
- testConcurrentGetOperations(null, false, false);
+ testConcurrentGetOperations(null, TRANSIENT_BLOB, false);
}
@Test
public void testConcurrentGetOperationsForJob() throws IOException, ExecutionException, InterruptedException {
- testConcurrentGetOperations(new JobID(), false, false);
+ testConcurrentGetOperations(new JobID(), TRANSIENT_BLOB, false);
}
@Test
public void testConcurrentGetOperationsForJobHa() throws IOException, ExecutionException, InterruptedException {
- testConcurrentGetOperations(new JobID(), true, false);
+ testConcurrentGetOperations(new JobID(), PERMANENT_BLOB, false);
}
@Test
public void testConcurrentGetOperationsForJobHa2() throws IOException, ExecutionException, InterruptedException {
- testConcurrentGetOperations(new JobID(), true, true);
+ testConcurrentGetOperations(new JobID(), PERMANENT_BLOB, true);
}
/**
@@ -424,12 +529,12 @@ public class BlobCacheGetTest extends TestLogger {
*
* @param jobId
* job ID to use (or <tt>null</tt> if job-unrelated)
- * @param highAvailability
- * whether to use permanent (<tt>true</tt>) or transient BLOBs (<tt>false</tt>)
+ * @param blobType
+ * whether the BLOB should become permanent or transient
* @param cacheAccessesHAStore
* whether the cache has access to the {@link BlobServer}'s HA store or not
*/
- private void testConcurrentGetOperations(final JobID jobId, final boolean highAvailability,
+ private void testConcurrentGetOperations(final JobID jobId, final BlobKey.BlobType blobType,
final boolean cacheAccessesHAStore)
throws IOException, InterruptedException, ExecutionException {
final Configuration config = new Configuration();
@@ -446,19 +551,19 @@ public class BlobCacheGetTest extends TestLogger {
MessageDigest md = BlobUtils.createMessageDigest();
// create the correct blob key by hashing our input data
- final BlobKey blobKey = new BlobKey(md.digest(data));
+ final BlobKey blobKey = BlobKey.createKey(blobType, md.digest(data));
final ExecutorService executor = Executors.newFixedThreadPool(numberConcurrentGetOperations);
try (
final BlobServer server = new BlobServer(config, blobStoreServer);
- final BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ final BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, cacheAccessesHAStore ? blobStoreServer : blobStoreCache)) {
server.start();
// upload data first
- assertEquals(blobKey, put(server, jobId, data, highAvailability));
+ assertEquals(blobKey, put(server, jobId, data, blobType));
// now try accessing it concurrently (only HA mode will be able to retrieve it from HA store!)
for (int i = 0; i < numberConcurrentGetOperations; i++) {
@@ -466,13 +571,13 @@ public class BlobCacheGetTest extends TestLogger {
.supplyAsync(
() -> {
try {
- File file = get(cache, jobId, blobKey, highAvailability);
+ File file = get(cache, jobId, blobKey);
// check that we have read the right data
validateGetAndClose(new FileInputStream(file), data);
return file;
} catch (IOException e) {
throw new CompletionException(new FlinkException(
- "Could not upload blob.", e));
+ "Could not read blob for key " + blobKey + '.', e));
}
}, executor);
@@ -481,8 +586,29 @@ public class BlobCacheGetTest extends TestLogger {
FutureUtils.ConjunctFuture<Collection<File>> filesFuture = FutureUtils.combineAll(getOperations);
- // wait until all operations have completed and check that no exception was thrown
- filesFuture.get();
+ if (blobType == PERMANENT_BLOB) {
+ // wait until all operations have completed and check that no exception was thrown
+ filesFuture.get();
+ } else {
+ // wait for all futures to complete (do not abort on expected exceptions) and check
+ // that at least one succeeded
+ int completedSuccessfully = 0;
+ for (CompletableFuture<File> op : getOperations) {
+ try {
+ op.get();
+ ++completedSuccessfully;
+ } catch (Throwable t) {
+ // transient BLOBs get deleted upon first access and only one request will be successful while all others will have an IOException caused by a FileNotFoundException
+ if (!(ExceptionUtils.getRootCause(t) instanceof FileNotFoundException)) {
+ // ignore
+ org.apache.flink.util.ExceptionUtils.rethrowIOException(t);
+ }
+ }
+ }
+ // multiple clients may have accessed the BLOB successfully before it was
+ // deleted, but always at least one:
+ assertThat(completedSuccessfully, greaterThanOrEqualTo(1));
+ }
} finally {
executor.shutdownNow();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
index 7066b95..aa23c80 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
@@ -55,6 +55,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
+import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyType;
+import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
import static org.apache.flink.runtime.blob.BlobServerPutTest.BlockingInputStream;
import static org.apache.flink.runtime.blob.BlobServerPutTest.ChunkedInputStream;
import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
@@ -154,7 +159,7 @@ public class BlobCachePutTest extends TestLogger {
server.start();
- BlobKey key = new BlobKey();
+ BlobKey key = new TransientBlobKey();
CheckedThread[] threads = new CheckedThread[] {
new TransientBlobCacheGetStorageLocation(cache, jobId, key),
new TransientBlobCacheGetStorageLocation(cache, jobId, key),
@@ -180,7 +185,7 @@ public class BlobCachePutTest extends TestLogger {
server.start();
- BlobKey key = new BlobKey();
+ BlobKey key = new PermanentBlobKey();
CheckedThread[] threads = new CheckedThread[] {
new PermanentBlobCacheGetStorageLocation(cache, jobId, key),
new PermanentBlobCacheGetStorageLocation(cache, jobId, key),
@@ -213,53 +218,51 @@ public class BlobCachePutTest extends TestLogger {
// --------------------------------------------------------------------------------------------
@Test
- public void testPutBufferSuccessfulGet1() throws IOException {
- testPutBufferSuccessfulGet(null, null, false);
+ public void testPutBufferTransientSuccessfulGet1() throws IOException, InterruptedException {
+ testPutBufferSuccessfulGet(null, null, TRANSIENT_BLOB);
}
@Test
- public void testPutBufferSuccessfulGet2() throws IOException {
- testPutBufferSuccessfulGet(null, new JobID(), false);
+ public void testPutBufferTransientSuccessfulGet2() throws IOException, InterruptedException {
+ testPutBufferSuccessfulGet(null, new JobID(), TRANSIENT_BLOB);
}
@Test
- public void testPutBufferSuccessfulGet3() throws IOException {
- testPutBufferSuccessfulGet(new JobID(), new JobID(), false);
+ public void testPutBufferTransientSuccessfulGet3() throws IOException, InterruptedException {
+ testPutBufferSuccessfulGet(new JobID(), new JobID(), TRANSIENT_BLOB);
}
@Test
- public void testPutBufferSuccessfulGet4() throws IOException {
- testPutBufferSuccessfulGet(new JobID(), null, false);
+ public void testPutBufferTransientSuccessfulGet4() throws IOException, InterruptedException {
+ testPutBufferSuccessfulGet(new JobID(), null, TRANSIENT_BLOB);
}
@Test
- public void testPutBufferSuccessfulGetHa() throws IOException {
- testPutBufferSuccessfulGet(new JobID(), new JobID(), true);
+ public void testPutBufferPermanentSuccessfulGet() throws IOException, InterruptedException {
+ testPutBufferSuccessfulGet(new JobID(), new JobID(), PERMANENT_BLOB);
}
/**
- * Uploads two byte arrays for different jobs into the server via the {@link BlobCache}. File
- * transfers should be successful.
+ * Uploads two byte arrays for different jobs into the server via the {@link BlobCacheService}.
+ * File transfers should be successful.
*
* @param jobId1
* first job id
* @param jobId2
* second job id
- * @param highAvailability
- * whether to upload a permanent blob (<tt>true</tt>, via {@link
- * BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)}) or not
- * (<tt>false</tt>, via {@link TransientBlobCache#put(JobID, byte[])}
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
private void testPutBufferSuccessfulGet(
- @Nullable JobID jobId1, @Nullable JobID jobId2, boolean highAvailability)
- throws IOException {
+ @Nullable JobID jobId1, @Nullable JobID jobId2, BlobKey.BlobType blobType)
+ throws IOException, InterruptedException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
try (
BlobServer server = new BlobServer(config, new VoidBlobStore());
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, new VoidBlobStore())) {
server.start();
@@ -269,58 +272,86 @@ public class BlobCachePutTest extends TestLogger {
byte[] data2 = Arrays.copyOfRange(data, 10, 54);
// put data for jobId1 and verify
- BlobKey key1a = put(cache, jobId1, data, highAvailability);
+ BlobKey key1a = put(cache, jobId1, data, blobType);
assertNotNull(key1a);
+ verifyType(blobType, key1a);
- BlobKey key1b = put(cache, jobId1, data2, highAvailability);
+ BlobKey key1b = put(cache, jobId1, data2, blobType);
assertNotNull(key1b);
+ verifyType(blobType, key1b);
// files should be available on the server
- verifyContents(server, jobId1, key1a, data, highAvailability);
- verifyContents(server, jobId1, key1b, data2, highAvailability);
+ verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1b, data2);
// now put data for jobId2 and verify that both are ok
- BlobKey key2a = put(cache, jobId2, data, highAvailability);
+ BlobKey key2a = put(cache, jobId2, data, blobType);
assertNotNull(key2a);
assertEquals(key1a, key2a);
- BlobKey key2b = put(cache, jobId2, data2, highAvailability);
+ BlobKey key2b = put(cache, jobId2, data2, blobType);
assertNotNull(key2b);
assertEquals(key1b, key2b);
// verify the accessibility and the BLOB contents
- verifyContents(server, jobId1, key1a, data, highAvailability);
- verifyContents(server, jobId1, key1b, data2, highAvailability);
- verifyContents(server, jobId2, key2a, data, highAvailability);
- verifyContents(server, jobId2, key2b, data2, highAvailability);
+ verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1b, data2);
+ verifyContents(server, jobId2, key2a, data);
+ verifyContents(server, jobId2, key2b, data2);
+
+ // now verify we can access the BLOBs from the cache
+ verifyContents(cache, jobId1, key1a, data);
+ verifyContents(cache, jobId1, key1b, data2);
+ verifyContents(cache, jobId2, key2a, data);
+ verifyContents(cache, jobId2, key2b, data2);
+
+ // transient BLOBs should be deleted from the server, eventually
+ if (blobType == TRANSIENT_BLOB) {
+ verifyDeletedEventually(server, jobId1, key1a);
+ verifyDeletedEventually(server, jobId1, key1b);
+ verifyDeletedEventually(server, jobId2, key2a);
+ verifyDeletedEventually(server, jobId2, key2b);
+
+ // the files are still there on the cache though
+ verifyContents(cache, jobId1, key1a, data);
+ verifyContents(cache, jobId1, key1b, data2);
+ verifyContents(cache, jobId2, key2a, data);
+ verifyContents(cache, jobId2, key2b, data2);
+ } else {
+ // still on the server for permanent BLOBs after accesses from a cache
+ verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1b, data2);
+ verifyContents(server, jobId2, key2a, data);
+ verifyContents(server, jobId2, key2b, data2);
+ }
}
}
// --------------------------------------------------------------------------------------------
@Test
- public void testPutStreamSuccessfulGet1() throws IOException {
- testPutStreamSuccessfulGet(null, null);
+ public void testPutStreamTransientSuccessfulGet1() throws IOException, InterruptedException {
+ testPutStreamTransientSuccessfulGet(null, null);
}
@Test
- public void testPutStreamSuccessfulGet2() throws IOException {
- testPutStreamSuccessfulGet(null, new JobID());
+ public void testPutStreamTransientSuccessfulGet2() throws IOException, InterruptedException {
+ testPutStreamTransientSuccessfulGet(null, new JobID());
}
@Test
- public void testPutStreamSuccessfulGet3() throws IOException {
- testPutStreamSuccessfulGet(new JobID(), new JobID());
+ public void testPutStreamTransientSuccessfulGet3() throws IOException, InterruptedException {
+ testPutStreamTransientSuccessfulGet(new JobID(), new JobID());
}
@Test
- public void testPutStreamSuccessfulGet4() throws IOException {
- testPutStreamSuccessfulGet(new JobID(), null);
+ public void testPutStreamTransientSuccessfulGet4() throws IOException, InterruptedException {
+ testPutStreamTransientSuccessfulGet(new JobID(), null);
}
/**
- * Uploads two file streams for different jobs into the server via the {@link BlobCache}. File
- * transfers should be successful.
+ * Uploads two file streams for different jobs into the server via the {@link BlobCacheService}.
+ * File transfers should be successful.
*
* <p>Note that high-availability uploads of streams is currently only possible at the {@link
* BlobServer}.
@@ -330,15 +361,15 @@ public class BlobCachePutTest extends TestLogger {
* @param jobId2
* second job id
*/
- private void testPutStreamSuccessfulGet(@Nullable JobID jobId1, @Nullable JobID jobId2)
- throws IOException {
+ private void testPutStreamTransientSuccessfulGet(@Nullable JobID jobId1, @Nullable JobID jobId2)
+ throws IOException, InterruptedException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
try (
BlobServer server = new BlobServer(config, new VoidBlobStore());
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, new VoidBlobStore())) {
server.start();
@@ -348,58 +379,82 @@ public class BlobCachePutTest extends TestLogger {
byte[] data2 = Arrays.copyOfRange(data, 10, 54);
// put data for jobId1 and verify
- BlobKey key1a = put(cache, jobId1, new ByteArrayInputStream(data), false);
+ TransientBlobKey key1a =
+ (TransientBlobKey) put(cache, jobId1, new ByteArrayInputStream(data), TRANSIENT_BLOB);
assertNotNull(key1a);
- BlobKey key1b = put(cache, jobId1, new ByteArrayInputStream(data2), false);
+ TransientBlobKey key1b = (TransientBlobKey) put(cache, jobId1, new ByteArrayInputStream(data2), TRANSIENT_BLOB);
assertNotNull(key1b);
// files should be available on the server
- verifyContents(server, jobId1, key1a, data, false);
- verifyContents(server, jobId1, key1b, data2, false);
+ verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1b, data2);
// now put data for jobId2 and verify that both are ok
- BlobKey key2a = put(cache, jobId2, new ByteArrayInputStream(data), false);
+ TransientBlobKey key2a =
+ (TransientBlobKey) put(cache, jobId2, new ByteArrayInputStream(data), TRANSIENT_BLOB);
assertNotNull(key2a);
assertEquals(key1a, key2a);
- BlobKey key2b = put(cache, jobId2, new ByteArrayInputStream(data2), false);
+ TransientBlobKey key2b = (TransientBlobKey) put(cache, jobId2, new ByteArrayInputStream(data2), TRANSIENT_BLOB);
assertNotNull(key2b);
assertEquals(key1b, key2b);
// verify the accessibility and the BLOB contents
- verifyContents(server, jobId1, key1a, data, false);
- verifyContents(server, jobId1, key1b, data2, false);
- verifyContents(server, jobId2, key2a, data, false);
- verifyContents(server, jobId2, key2b, data2, false);
+ verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1b, data2);
+ verifyContents(server, jobId2, key2a, data);
+ verifyContents(server, jobId2, key2b, data2);
+
+ // now verify we can access the BLOBs from the cache
+ verifyContents(cache, jobId1, key1a, data);
+ verifyContents(cache, jobId1, key1b, data2);
+ verifyContents(cache, jobId2, key2a, data);
+ verifyContents(cache, jobId2, key2b, data2);
+
+ // transient BLOBs should be deleted from the server, eventually
+ verifyDeletedEventually(server, jobId1, key1a);
+ verifyDeletedEventually(server, jobId1, key1b);
+ verifyDeletedEventually(server, jobId2, key2a);
+ verifyDeletedEventually(server, jobId2, key2b);
+
+ // the files are still there on the cache though
+ verifyContents(cache, jobId1, key1a, data);
+ verifyContents(cache, jobId1, key1b, data2);
+ verifyContents(cache, jobId2, key2a, data);
+ verifyContents(cache, jobId2, key2b, data2);
}
}
// --------------------------------------------------------------------------------------------
@Test
- public void testPutChunkedStreamSuccessfulGet1() throws IOException {
- testPutChunkedStreamSuccessfulGet(null, null);
+ public void testPutChunkedStreamTransientSuccessfulGet1()
+ throws IOException, InterruptedException {
+ testPutChunkedStreamTransientSuccessfulGet(null, null);
}
@Test
- public void testPutChunkedStreamSuccessfulGet2() throws IOException {
- testPutChunkedStreamSuccessfulGet(null, new JobID());
+ public void testPutChunkedStreamTransientSuccessfulGet2()
+ throws IOException, InterruptedException {
+ testPutChunkedStreamTransientSuccessfulGet(null, new JobID());
}
@Test
- public void testPutChunkedStreamSuccessfulGet3() throws IOException {
- testPutChunkedStreamSuccessfulGet(new JobID(), new JobID());
+ public void testPutChunkedStreamTransientSuccessfulGet3()
+ throws IOException, InterruptedException {
+ testPutChunkedStreamTransientSuccessfulGet(new JobID(), new JobID());
}
@Test
- public void testPutChunkedStreamSuccessfulGet4() throws IOException {
- testPutChunkedStreamSuccessfulGet(new JobID(), null);
+ public void testPutChunkedStreamTransientSuccessfulGet4()
+ throws IOException, InterruptedException {
+ testPutChunkedStreamTransientSuccessfulGet(new JobID(), null);
}
/**
* Uploads two chunked file streams for different jobs into the server via the {@link
- * BlobCache}. File transfers should be successful.
+ * BlobCacheService}. File transfers should be successful.
*
* <p>Note that high-availability uploads of streams is currently only possible at the {@link
* BlobServer}.
@@ -409,15 +464,16 @@ public class BlobCachePutTest extends TestLogger {
* @param jobId2
* second job id
*/
- private void testPutChunkedStreamSuccessfulGet(
- @Nullable JobID jobId1, @Nullable JobID jobId2) throws IOException {
+ private void testPutChunkedStreamTransientSuccessfulGet(
+ @Nullable JobID jobId1, @Nullable JobID jobId2)
+ throws IOException, InterruptedException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
try (
BlobServer server = new BlobServer(config, new VoidBlobStore());
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, new VoidBlobStore())) {
server.start();
@@ -427,30 +483,52 @@ public class BlobCachePutTest extends TestLogger {
byte[] data2 = Arrays.copyOfRange(data, 10, 54);
// put data for jobId1 and verify
- BlobKey key1a = put(cache, jobId1, new ChunkedInputStream(data, 19), false);
+ TransientBlobKey key1a =
+ (TransientBlobKey) put(cache, jobId1, new ChunkedInputStream(data, 19), TRANSIENT_BLOB);
assertNotNull(key1a);
- BlobKey key1b = put(cache, jobId1, new ChunkedInputStream(data2, 19), false);
+ TransientBlobKey key1b =
+ (TransientBlobKey) put(cache, jobId1, new ChunkedInputStream(data2, 19), TRANSIENT_BLOB);
assertNotNull(key1b);
// files should be available on the server
- verifyContents(server, jobId1, key1a, data, false);
- verifyContents(server, jobId1, key1b, data2, false);
+ verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1b, data2);
// now put data for jobId2 and verify that both are ok
- BlobKey key2a = put(cache, jobId2, new ChunkedInputStream(data, 19), false);
+ TransientBlobKey key2a =
+ (TransientBlobKey) put(cache, jobId2, new ChunkedInputStream(data, 19), TRANSIENT_BLOB);
assertNotNull(key2a);
assertEquals(key1a, key2a);
- BlobKey key2b = put(cache, jobId2, new ChunkedInputStream(data2, 19), false);
+ TransientBlobKey key2b =
+ (TransientBlobKey) put(cache, jobId2, new ChunkedInputStream(data2, 19), TRANSIENT_BLOB);
assertNotNull(key2b);
assertEquals(key1b, key2b);
// verify the accessibility and the BLOB contents
- verifyContents(server, jobId1, key1a, data, false);
- verifyContents(server, jobId1, key1b, data2, false);
- verifyContents(server, jobId2, key2a, data, false);
- verifyContents(server, jobId2, key2b, data2, false);
+ verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1b, data2);
+ verifyContents(server, jobId2, key2a, data);
+ verifyContents(server, jobId2, key2b, data2);
+
+ // now verify we can access the BLOBs from the cache
+ verifyContents(cache, jobId1, key1a, data);
+ verifyContents(cache, jobId1, key1b, data2);
+ verifyContents(cache, jobId2, key2a, data);
+ verifyContents(cache, jobId2, key2b, data2);
+
+ // transient BLOBs should be deleted from the server, eventually
+ verifyDeletedEventually(server, jobId1, key1a);
+ verifyDeletedEventually(server, jobId1, key1b);
+ verifyDeletedEventually(server, jobId2, key2a);
+ verifyDeletedEventually(server, jobId2, key2b);
+
+ // the files are still there on the cache though
+ verifyContents(cache, jobId1, key1a, data);
+ verifyContents(cache, jobId1, key1b, data2);
+ verifyContents(cache, jobId2, key2a, data);
+ verifyContents(cache, jobId2, key2b, data2);
}
}
@@ -458,31 +536,29 @@ public class BlobCachePutTest extends TestLogger {
@Test
public void testPutBufferFailsNoJob() throws IOException {
- testPutBufferFails(null, false);
+ testPutBufferFails(null, TRANSIENT_BLOB);
}
@Test
public void testPutBufferFailsForJob() throws IOException {
- testPutBufferFails(new JobID(), false);
+ testPutBufferFails(new JobID(), TRANSIENT_BLOB);
}
@Test
public void testPutBufferFailsForJobHa() throws IOException {
- testPutBufferFails(new JobID(), true);
+ testPutBufferFails(new JobID(), PERMANENT_BLOB);
}
/**
- * Uploads a byte array to a server which cannot create any files via the {@link BlobCache}.
- * File transfers should fail.
+ * Uploads a byte array to a server which cannot create any files via the {@link
+ * BlobCacheService}. File transfers should fail.
*
* @param jobId
* job id
- * @param highAvailability
- * whether to upload a permanent blob (<tt>true</tt>, via {@link
- * BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)}) or not
- * (<tt>false</tt>, via {@link TransientBlobCache#put(JobID, byte[])}
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
- private void testPutBufferFails(@Nullable final JobID jobId, boolean highAvailability)
+ private void testPutBufferFails(@Nullable final JobID jobId, BlobKey.BlobType blobType)
throws IOException {
assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
@@ -492,7 +568,7 @@ public class BlobCachePutTest extends TestLogger {
File tempFileDir = null;
try (
BlobServer server = new BlobServer(config, new VoidBlobStore());
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, new VoidBlobStore())) {
server.start();
@@ -510,7 +586,7 @@ public class BlobCachePutTest extends TestLogger {
exception.expect(IOException.class);
exception.expectMessage("PUT operation failed: ");
- put(cache, jobId, data, highAvailability);
+ put(cache, jobId, data, blobType);
} finally {
// set writable again to make sure we can remove the directory
@@ -523,31 +599,29 @@ public class BlobCachePutTest extends TestLogger {
@Test
public void testPutBufferFailsIncomingNoJob() throws IOException {
- testPutBufferFailsIncoming(null, false);
+ testPutBufferFailsIncoming(null, TRANSIENT_BLOB);
}
@Test
public void testPutBufferFailsIncomingForJob() throws IOException {
- testPutBufferFailsIncoming(new JobID(), false);
+ testPutBufferFailsIncoming(new JobID(), TRANSIENT_BLOB);
}
@Test
public void testPutBufferFailsIncomingForJobHa() throws IOException {
- testPutBufferFailsIncoming(new JobID(), true);
+ testPutBufferFailsIncoming(new JobID(), PERMANENT_BLOB);
}
/**
* Uploads a byte array to a server which cannot create incoming files via the {@link
- * BlobCache}. File transfers should fail.
+ * BlobCacheService}. File transfers should fail.
*
* @param jobId
* job id
- * @param highAvailability
- * whether to upload a permanent blob (<tt>true</tt>, via {@link
- * BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)}) or not
- * (<tt>false</tt>, via {@link TransientBlobCache#put(JobID, byte[])}
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
- private void testPutBufferFailsIncoming(@Nullable final JobID jobId, boolean highAvailability)
+ private void testPutBufferFailsIncoming(@Nullable final JobID jobId, BlobKey.BlobType blobType)
throws IOException {
assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
@@ -557,7 +631,7 @@ public class BlobCachePutTest extends TestLogger {
File tempFileDir = null;
try (
BlobServer server = new BlobServer(config, new VoidBlobStore());
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, new VoidBlobStore())) {
server.start();
@@ -576,7 +650,7 @@ public class BlobCachePutTest extends TestLogger {
exception.expectMessage("PUT operation failed: ");
try {
- put(cache, jobId, data, highAvailability);
+ put(cache, jobId, data, blobType);
} finally {
File storageDir = tempFileDir.getParentFile();
// only the incoming directory should exist (no job directory!)
@@ -593,31 +667,29 @@ public class BlobCachePutTest extends TestLogger {
@Test
public void testPutBufferFailsStoreNoJob() throws IOException {
- testPutBufferFailsStore(null, false);
+ testPutBufferFailsStore(null, TRANSIENT_BLOB);
}
@Test
public void testPutBufferFailsStoreForJob() throws IOException {
- testPutBufferFailsStore(new JobID(), false);
+ testPutBufferFailsStore(new JobID(), TRANSIENT_BLOB);
}
@Test
public void testPutBufferFailsStoreForJobHa() throws IOException {
- testPutBufferFailsStore(new JobID(), true);
+ testPutBufferFailsStore(new JobID(), PERMANENT_BLOB);
}
/**
- * Uploads a byte array to a server which cannot create files via the {@link BlobCache}. File
- * transfers should fail.
+ * Uploads a byte array to a server which cannot create files via the {@link BlobCacheService}.
+ * File transfers should fail.
*
* @param jobId
* job id
- * @param highAvailability
- * whether to upload a permanent blob (<tt>true</tt>, via {@link
- * BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)}) or not
- * (<tt>false</tt>, via {@link TransientBlobCache#put(JobID, byte[])}
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
- private void testPutBufferFailsStore(@Nullable final JobID jobId, boolean highAvailability)
+ private void testPutBufferFailsStore(@Nullable final JobID jobId, BlobKey.BlobType blobType)
throws IOException {
assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
@@ -627,13 +699,14 @@ public class BlobCachePutTest extends TestLogger {
File jobStoreDir = null;
try (
BlobServer server = new BlobServer(config, new VoidBlobStore());
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, new VoidBlobStore())) {
server.start();
// make sure the blob server cannot create any files in its storage dir
- jobStoreDir = server.getStorageLocation(jobId, new BlobKey()).getParentFile();
+ jobStoreDir = server.getStorageLocation(jobId,
+ BlobKey.createKey(blobType)).getParentFile();
assertTrue(jobStoreDir.setExecutable(true, false));
assertTrue(jobStoreDir.setReadable(true, false));
assertTrue(jobStoreDir.setWritable(false, false));
@@ -646,7 +719,7 @@ public class BlobCachePutTest extends TestLogger {
exception.expectMessage("PUT operation failed: ");
try {
- put(cache, jobId, data, highAvailability);
+ put(cache, jobId, data, blobType);
} finally {
// there should be no remaining incoming files
File incomingFileDir = new File(jobStoreDir.getParent(), "incoming");
@@ -666,17 +739,17 @@ public class BlobCachePutTest extends TestLogger {
@Test
public void testConcurrentPutOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
- testConcurrentPutOperations(null, false);
+ testConcurrentPutOperations(null, TRANSIENT_BLOB);
}
@Test
public void testConcurrentPutOperationsForJob() throws IOException, ExecutionException, InterruptedException {
- testConcurrentPutOperations(new JobID(), false);
+ testConcurrentPutOperations(new JobID(), TRANSIENT_BLOB);
}
@Test
public void testConcurrentPutOperationsForJobHa() throws IOException, ExecutionException, InterruptedException {
- testConcurrentPutOperations(new JobID(), true);
+ testConcurrentPutOperations(new JobID(), PERMANENT_BLOB);
}
/**
@@ -686,11 +759,11 @@ public class BlobCachePutTest extends TestLogger {
*
* @param jobId
* job ID to use (or <tt>null</tt> if job-unrelated)
- * @param highAvailability
- * whether to use permanent (<tt>true</tt>) or transient BLOBs (<tt>false</tt>)
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
private void testConcurrentPutOperations(
- @Nullable final JobID jobId, final boolean highAvailability)
+ @Nullable final JobID jobId, final BlobKey.BlobType blobType)
throws IOException, InterruptedException, ExecutionException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -705,7 +778,7 @@ public class BlobCachePutTest extends TestLogger {
final byte[] data = new byte[dataSize];
final List<Path> jars;
- if (highAvailability) {
+ if (blobType == PERMANENT_BLOB) {
// implement via JAR file upload instead:
File tmpFile = temporaryFolder.newFile();
FileUtils.writeByteArrayToFile(tmpFile, data);
@@ -720,7 +793,7 @@ public class BlobCachePutTest extends TestLogger {
try (
final BlobServer server = new BlobServer(config, blobStoreServer);
- final BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ final BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, blobStoreCache)) {
server.start();
@@ -732,17 +805,17 @@ public class BlobCachePutTest extends TestLogger {
for (int i = 0; i < concurrentPutOperations; i++) {
final Supplier<BlobKey> callable;
- if (highAvailability) {
+ if (blobType == PERMANENT_BLOB) {
// cannot use a blocking stream here (upload only possible via files)
callable =
() -> {
try {
- List<BlobKey> keys =
+ List<PermanentBlobKey> keys =
BlobClient.uploadJarFiles(serverAddress, config, jobId, jars);
assertEquals(1, keys.size());
BlobKey uploadedKey = keys.get(0);
// check the uploaded file's contents (concurrently)
- verifyContents(server, jobId, uploadedKey, data, true);
+ verifyContents(server, jobId, uploadedKey, data);
return uploadedKey;
} catch (IOException e) {
throw new CompletionException(new FlinkException(
@@ -756,9 +829,9 @@ public class BlobCachePutTest extends TestLogger {
try {
BlockingInputStream inputStream =
new BlockingInputStream(countDownLatch, data);
- BlobKey uploadedKey = put(cache, jobId, inputStream, false);
+ BlobKey uploadedKey = put(cache, jobId, inputStream, blobType);
// check the uploaded file's contents (concurrently)
- verifyContents(server, jobId, uploadedKey, data, false);
+ verifyContents(server, jobId, uploadedKey, data);
return uploadedKey;
} catch (IOException e) {
throw new CompletionException(new FlinkException(
@@ -789,10 +862,10 @@ public class BlobCachePutTest extends TestLogger {
}
// check the uploaded file's contents
- verifyContents(server, jobId, blobKey, data, highAvailability);
+ verifyContents(server, jobId, blobKey, data);
// check that we only uploaded the file once to the blob store
- if (highAvailability) {
+ if (blobType == PERMANENT_BLOB) {
verify(blobStoreServer, times(1)).put(any(File.class), eq(jobId), eq(blobKey));
} else {
// can't really verify much in the other cases other than that the put operations should
@@ -805,4 +878,27 @@ public class BlobCachePutTest extends TestLogger {
executor.shutdownNow();
}
}
+
+ /**
+ * Checks that the given blob will be deleted at the {@link BlobServer} eventually (waits at most 30s).
+ *
+ * @param server
+ * BLOB server
+ * @param jobId
+ * job ID or <tt>null</tt> if job-unrelated
+ * @param key
+ * key identifying the BLOB to request
+ */
+ static void verifyDeletedEventually(BlobServer server, @Nullable JobID jobId, BlobKey key)
+ throws IOException, InterruptedException {
+
+ long deadline = System.currentTimeMillis() + 30_000L;
+ do {
+ Thread.sleep(10);
+ }
+ while (checkFilesExist(jobId, Collections.singletonList(key), server, false) != 0 &&
+ System.currentTimeMillis() < deadline);
+
+ verifyDeleted(server, jobId, key);
+ }
}