You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/05 14:07:04 UTC
[11/14] flink git commit: [FLINK-7068][blob] Introduce permanent and
transient BLOB keys
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
index 9f462c2..1a3f161 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
@@ -37,15 +36,19 @@ import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Random;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.junit.Assert.assertEquals;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
/**
- * Tests for the recovery of files of a {@link BlobCache} from a HA store.
+ * Tests for the recovery of files of a {@link BlobCacheService} from a HA store.
*/
public class BlobCacheRecoveryTest extends TestLogger {
@@ -60,7 +63,6 @@ public class BlobCacheRecoveryTest extends TestLogger {
public void testBlobCacheRecovery() throws Exception {
Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
- config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
@@ -80,9 +82,9 @@ public class BlobCacheRecoveryTest extends TestLogger {
/**
* Helper to test that the {@link BlobServer} recovery from its HA store works.
*
- * <p>Uploads two BLOBs to one {@link BlobServer} via a {@link BlobCache} and expects a second
- * {@link BlobCache} to be able to retrieve them from a second {@link BlobServer} that is
- * configured with the same HA store.
+ * <p>Uploads two BLOBs to one {@link BlobServer} via a {@link BlobCacheService} and expects a
+ * second {@link BlobCacheService} to be able to retrieve them from a second {@link BlobServer}
+ * that is configured with the same HA store.
*
* @param config
* blob server configuration (including HA settings like {@link HighAvailabilityOptions#HA_STORAGE_PATH}
@@ -104,10 +106,10 @@ public class BlobCacheRecoveryTest extends TestLogger {
BlobServer server0 = new BlobServer(config, blobStore);
BlobServer server1 = new BlobServer(config, blobStore);
// use VoidBlobStore as the HA store to force download from each server's HA store
- BlobCache cache0 = new BlobCache(
+ BlobCacheService cache0 = new BlobCacheService(
new InetSocketAddress("localhost", server0.getPort()), config,
new VoidBlobStore());
- BlobCache cache1 = new BlobCache(
+ BlobCacheService cache1 = new BlobCacheService(
new InetSocketAddress("localhost", server1.getPort()), config,
new VoidBlobStore())) {
@@ -124,13 +126,15 @@ public class BlobCacheRecoveryTest extends TestLogger {
// Put job-related HA data
JobID[] jobId = new JobID[] { new JobID(), new JobID() };
- keys[0] = put(cache0, jobId[0], expected, true); // Request 1
- keys[1] = put(cache0, jobId[1], expected2, true); // Request 2
+ keys[0] = put(cache0, jobId[0], expected, PERMANENT_BLOB); // Request 1
+ keys[1] = put(cache0, jobId[1], expected2, PERMANENT_BLOB); // Request 2
// put non-HA data
- nonHAKey = put(cache0, jobId[0], expected2, false);
+ nonHAKey = put(cache0, jobId[0], expected2, TRANSIENT_BLOB);
assertNotEquals(keys[0], nonHAKey);
- assertEquals(keys[1], nonHAKey);
+ assertThat(keys[0].getHash(), not(equalTo(nonHAKey.getHash())));
+ assertNotEquals(keys[1], nonHAKey);
+ assertThat(keys[1].getHash(), equalTo(nonHAKey.getHash()));
// check that the storage directory exists
final Path blobServerPath = new Path(storagePath, "blob");
@@ -138,11 +142,11 @@ public class BlobCacheRecoveryTest extends TestLogger {
assertTrue("Unknown storage dir: " + blobServerPath, fs.exists(blobServerPath));
// Verify HA requests from cache1 (connected to server1) with no immediate access to the file
- verifyContents(cache1, jobId[0], keys[0], expected, true);
- verifyContents(cache1, jobId[1], keys[1], expected2, true);
+ verifyContents(cache1, jobId[0], keys[0], expected);
+ verifyContents(cache1, jobId[1], keys[1], expected2);
// Verify non-HA file is not accessible from server1
- verifyDeleted(cache1, jobId[0], nonHAKey, true);
+ verifyDeleted(cache1, jobId[0], nonHAKey);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
index 9f8bfe3..80e9767 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -33,6 +33,9 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.net.InetSocketAddress;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
import static org.junit.Assert.fail;
@@ -54,7 +57,7 @@ public class BlobCacheRetriesTest extends TestLogger {
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
- testBlobFetchRetries(config, new VoidBlobStore(), null, false);
+ testBlobFetchRetries(config, new VoidBlobStore(), null, TRANSIENT_BLOB);
}
/**
@@ -67,7 +70,7 @@ public class BlobCacheRetriesTest extends TestLogger {
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
- testBlobFetchRetries(config, new VoidBlobStore(), new JobID(), false);
+ testBlobFetchRetries(config, new VoidBlobStore(), new JobID(), TRANSIENT_BLOB);
}
/**
@@ -88,7 +91,7 @@ public class BlobCacheRetriesTest extends TestLogger {
try {
blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
- testBlobFetchRetries(config, blobStoreService, new JobID(), true);
+ testBlobFetchRetries(config, blobStoreService, new JobID(), PERMANENT_BLOB);
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
@@ -103,25 +106,27 @@ public class BlobCacheRetriesTest extends TestLogger {
* @param config
* configuration to use (the BlobCache will get some additional settings
* set compared to this one)
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
private static void testBlobFetchRetries(
final Configuration config, final BlobStore blobStore, @Nullable final JobID jobId,
- boolean highAvailability) throws IOException {
+ BlobKey.BlobType blobType) throws IOException {
final byte[] data = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
try (
BlobServer server = new TestingFailingBlobServer(config, blobStore, 2);
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, new VoidBlobStore())) {
server.start();
// upload some blob
- final BlobKey key = BlobServerPutTest.put(server, jobId, data, highAvailability);
+ final BlobKey key = put(server, jobId, data, blobType);
// trigger a download - it should fail the first two times, but retry, and succeed eventually
- verifyContents(cache, jobId, key, data, highAvailability);
+ verifyContents(cache, jobId, key, data);
}
}
@@ -135,7 +140,7 @@ public class BlobCacheRetriesTest extends TestLogger {
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
- testBlobFetchWithTooManyFailures(config, new VoidBlobStore(), null, false);
+ testBlobFetchWithTooManyFailures(config, new VoidBlobStore(), null, TRANSIENT_BLOB);
}
/**
@@ -148,7 +153,7 @@ public class BlobCacheRetriesTest extends TestLogger {
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
- testBlobFetchWithTooManyFailures(config, new VoidBlobStore(), new JobID(), false);
+ testBlobFetchWithTooManyFailures(config, new VoidBlobStore(), new JobID(), TRANSIENT_BLOB);
}
/**
@@ -169,7 +174,7 @@ public class BlobCacheRetriesTest extends TestLogger {
try {
blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
- testBlobFetchWithTooManyFailures(config, blobStoreService, new JobID(), true);
+ testBlobFetchWithTooManyFailures(config, blobStoreService, new JobID(), PERMANENT_BLOB);
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
@@ -184,26 +189,28 @@ public class BlobCacheRetriesTest extends TestLogger {
* @param config
* configuration to use (the BlobCache will get some additional settings
* set compared to this one)
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
private static void testBlobFetchWithTooManyFailures(
final Configuration config, final BlobStore blobStore, @Nullable final JobID jobId,
- boolean highAvailability) throws IOException {
+ BlobKey.BlobType blobType) throws IOException {
final byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
try (
BlobServer server = new TestingFailingBlobServer(config, blobStore, 0, 10);
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
config, new VoidBlobStore())) {
server.start();
// upload some blob
- final BlobKey key = BlobServerPutTest.put(server, jobId, data, highAvailability);
+ final BlobKey key = put(server, jobId, data, blobType);
// trigger a download - it should fail eventually
try {
- verifyContents(cache, jobId, key, data, highAvailability);
+ verifyContents(cache, jobId, key, data);
fail("This should fail");
}
catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index 7a69804..fcdc714 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -34,11 +34,13 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
/**
- * This class contains unit tests for the {@link BlobCache}.
+ * This class contains unit tests for the {@link BlobCacheService}.
*/
public class BlobCacheSuccessTest extends TestLogger {
@@ -55,7 +57,7 @@ public class BlobCacheSuccessTest extends TestLogger {
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
- uploadFileGetTest(config, null, false, false, false);
+ uploadFileGetTest(config, null, false, false, TRANSIENT_BLOB);
}
/**
@@ -68,7 +70,7 @@ public class BlobCacheSuccessTest extends TestLogger {
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
- uploadFileGetTest(config, new JobID(), false, false, false);
+ uploadFileGetTest(config, new JobID(), false, false, TRANSIENT_BLOB);
}
/**
@@ -85,7 +87,7 @@ public class BlobCacheSuccessTest extends TestLogger {
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.newFolder().getPath());
- uploadFileGetTest(config, new JobID(), true, true, true);
+ uploadFileGetTest(config, new JobID(), true, true, PERMANENT_BLOB);
}
/**
@@ -101,7 +103,7 @@ public class BlobCacheSuccessTest extends TestLogger {
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.newFolder().getPath());
- uploadFileGetTest(config, new JobID(), false, true, true);
+ uploadFileGetTest(config, new JobID(), false, true, PERMANENT_BLOB);
}
/**
@@ -117,12 +119,12 @@ public class BlobCacheSuccessTest extends TestLogger {
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.newFolder().getPath());
- uploadFileGetTest(config, new JobID(), false, false, true);
+ uploadFileGetTest(config, new JobID(), false, false, PERMANENT_BLOB);
}
/**
* Uploads two different BLOBs to the {@link BlobServer} via a {@link BlobClient} and verifies
- * we can access the files from a {@link BlobCache}.
+ * we can access the files from a {@link BlobCacheService}.
*
* @param config
* configuration to use for the server and cache (the final cache's configuration will
@@ -133,12 +135,12 @@ public class BlobCacheSuccessTest extends TestLogger {
* @param cacheHasAccessToFs
* whether the cache should have access to a shared <tt>HA_STORAGE_PATH</tt> (only useful with
* HA mode)
- * @param highAvailability
- * whether to use HA BLOB upload and download methods
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
private void uploadFileGetTest(
final Configuration config, @Nullable JobID jobId, boolean shutdownServerAfterUpload,
- boolean cacheHasAccessToFs, boolean highAvailability) throws IOException {
+ boolean cacheHasAccessToFs, BlobKey.BlobType blobType) throws IOException {
final Configuration cacheConfig = new Configuration(config);
cacheConfig.setString(BlobServerOptions.STORAGE_DIRECTORY,
@@ -162,29 +164,29 @@ public class BlobCacheSuccessTest extends TestLogger {
blobStoreService = BlobUtils.createBlobStoreFromConfig(cacheConfig);
try (
BlobServer server = new BlobServer(config, blobStoreService);
- BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+ BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
cacheConfig, blobStoreService)) {
server.start();
// Upload BLOBs
- BlobKey key1 = put(server, jobId, data, highAvailability);
- BlobKey key2 = put(server, jobId, data2, highAvailability);
+ BlobKey key1 = put(server, jobId, data, blobType);
+ BlobKey key2 = put(server, jobId, data2, blobType);
if (shutdownServerAfterUpload) {
// Now, shut down the BLOB server, the BLOBs must still be accessible through the cache.
server.close();
}
- verifyContents(cache, jobId, key1, data, highAvailability);
- verifyContents(cache, jobId, key2, data2, highAvailability);
+ verifyContents(cache, jobId, key1, data);
+ verifyContents(cache, jobId, key2, data2);
if (shutdownServerAfterUpload) {
// Now, shut down the BLOB server, the BLOBs must still be accessible through the cache.
server.close();
- verifyContents(cache, jobId, key1, data, highAvailability);
- verifyContents(cache, jobId, key2, data2, highAvailability);
+ verifyContents(cache, jobId, key1, data);
+ verifyContents(cache, jobId, key2, data2);
}
}
} finally {
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index c4444c8..0e97604 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -44,6 +44,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
+import static org.apache.flink.runtime.blob.BlobCachePutTest.verifyDeletedEventually;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -105,16 +108,20 @@ public class BlobClientTest extends TestLogger {
}
/**
- * Prepares a test file for the unit tests, i.e. the methods fills the file with a particular byte patterns and
- * computes the file's BLOB key.
+ * Prepares a test file for the unit tests, i.e. the methods fills the file with a particular
+ * byte patterns and computes the file's BLOB key.
*
* @param file
- * the file to prepare for the unit tests
+ * the file to prepare for the unit tests
+ * @param blobType
+ * whether the BLOB should become permanent or transient
+ *
* @return the BLOB key of the prepared file
+ *
* @throws IOException
- * thrown if an I/O error occurs while writing to the test file
+ * thrown if an I/O error occurs while writing to the test file
*/
- private static BlobKey prepareTestFile(File file) throws IOException {
+ private static BlobKey prepareTestFile(File file, BlobKey.BlobType blobType) throws IOException {
MessageDigest md = BlobUtils.createMessageDigest();
@@ -138,7 +145,7 @@ public class BlobClientTest extends TestLogger {
}
}
- return new BlobKey(md.digest());
+ return BlobKey.createKey(blobType, md.digest());
}
/**
@@ -224,29 +231,30 @@ public class BlobClientTest extends TestLogger {
}
@Test
- public void testContentAddressableBufferTransientBlob() throws IOException {
- testContentAddressableBuffer(false);
+ public void testContentAddressableBufferTransientBlob() throws IOException, InterruptedException {
+ testContentAddressableBuffer(TRANSIENT_BLOB);
}
@Test
- public void testContentAddressableBufferPermantBlob() throws IOException {
- testContentAddressableBuffer(true);
+ public void testContentAddressableBufferPermantBlob() throws IOException, InterruptedException {
+ testContentAddressableBuffer(PERMANENT_BLOB);
}
/**
* Tests the PUT/GET operations for content-addressable buffers.
*
- * @param permanentBlob
- * whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
- private void testContentAddressableBuffer(boolean permanentBlob) throws IOException {
+ private void testContentAddressableBuffer(BlobKey.BlobType blobType)
+ throws IOException, InterruptedException {
BlobClient client = null;
try {
byte[] testBuffer = createTestBuffer();
MessageDigest md = BlobUtils.createMessageDigest();
md.update(testBuffer);
- BlobKey origKey = new BlobKey(md.digest());
+ BlobKey origKey = BlobKey.createKey(blobType, md.digest());
InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
client = new BlobClient(serverAddress, getBlobClientConfig());
@@ -255,24 +263,30 @@ public class BlobClientTest extends TestLogger {
BlobKey receivedKey;
// Store the data (job-unrelated)
- if (!permanentBlob) {
- receivedKey = client.putBuffer(null, testBuffer, 0, testBuffer.length, false);
+ if (blobType == TRANSIENT_BLOB) {
+ receivedKey = client.putBuffer(null, testBuffer, 0, testBuffer.length, blobType);
assertEquals(origKey, receivedKey);
}
// try again with a job-related BLOB:
- receivedKey = client.putBuffer(jobId, testBuffer, 0, testBuffer.length, permanentBlob);
+ receivedKey = client.putBuffer(jobId, testBuffer, 0, testBuffer.length, blobType);
assertEquals(origKey, receivedKey);
// Retrieve the data (job-unrelated)
- if (!permanentBlob) {
- validateGetAndClose(client.getInternal(null, receivedKey, false), testBuffer);
+ if (blobType == TRANSIENT_BLOB) {
+ validateGetAndClose(client.getInternal(null, receivedKey), testBuffer);
+ // transient BLOBs should be deleted from the server, eventually
+ verifyDeletedEventually(getBlobServer(), null, receivedKey);
}
// job-related
- validateGetAndClose(client.getInternal(jobId, receivedKey, permanentBlob), testBuffer);
+ validateGetAndClose(client.getInternal(jobId, receivedKey), testBuffer);
+ if (blobType == TRANSIENT_BLOB) {
+ // transient BLOBs should be deleted from the server, eventually
+ verifyDeletedEventually(getBlobServer(), jobId, receivedKey);
+ }
// Check reaction to invalid keys for job-unrelated blobs
- try (InputStream ignored = client.getInternal(null, new BlobKey(), permanentBlob)) {
+ try (InputStream ignored = client.getInternal(null, BlobKey.createKey(blobType))) {
fail("Expected IOException did not occur");
}
catch (IOException fnfe) {
@@ -282,7 +296,7 @@ public class BlobClientTest extends TestLogger {
// Check reaction to invalid keys for job-related blobs
// new client needed (closed from failure above)
client = new BlobClient(serverAddress, getBlobClientConfig());
- try (InputStream ignored = client.getInternal(jobId, new BlobKey(), permanentBlob)) {
+ try (InputStream ignored = client.getInternal(jobId, BlobKey.createKey(blobType))) {
fail("Expected IOException did not occur");
}
catch (IOException fnfe) {
@@ -307,25 +321,28 @@ public class BlobClientTest extends TestLogger {
}
@Test
- public void testContentAddressableStreamTransientBlob() throws IOException {
- testContentAddressableStream(false);
+ public void testContentAddressableStreamTransientBlob()
+ throws IOException, InterruptedException {
+ testContentAddressableStream(TRANSIENT_BLOB);
}
@Test
- public void testContentAddressableStreamPermanentBlob() throws IOException {
- testContentAddressableStream(true);
+ public void testContentAddressableStreamPermanentBlob()
+ throws IOException, InterruptedException {
+ testContentAddressableStream(PERMANENT_BLOB);
}
/**
* Tests the PUT/GET operations for content-addressable streams.
*
- * @param permanentBlob
- * whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
- private void testContentAddressableStream(boolean permanentBlob) throws IOException {
+ private void testContentAddressableStream(BlobKey.BlobType blobType)
+ throws IOException, InterruptedException {
File testFile = temporaryFolder.newFile();
- BlobKey origKey = prepareTestFile(testFile);
+ BlobKey origKey = prepareTestFile(testFile, blobType);
InputStream is = null;
@@ -335,26 +352,32 @@ public class BlobClientTest extends TestLogger {
BlobKey receivedKey;
// Store the data (job-unrelated)
- if (!permanentBlob) {
+ if (blobType == TRANSIENT_BLOB) {
is = new FileInputStream(testFile);
- receivedKey = client.putInputStream(null, is, false);
+ receivedKey = client.putInputStream(null, is, blobType);
assertEquals(origKey, receivedKey);
}
// try again with a job-related BLOB:
is = new FileInputStream(testFile);
- receivedKey = client.putInputStream(jobId, is, permanentBlob);
+ receivedKey = client.putInputStream(jobId, is, blobType);
assertEquals(origKey, receivedKey);
is.close();
is = null;
// Retrieve the data (job-unrelated)
- if (!permanentBlob) {
- validateGetAndClose(client.getInternal(null, receivedKey, false), testFile);
+ if (blobType == TRANSIENT_BLOB) {
+ validateGetAndClose(client.getInternal(null, receivedKey), testFile);
+ // transient BLOBs should be deleted from the server, eventually
+ verifyDeletedEventually(getBlobServer(), null, receivedKey);
}
// job-related
- validateGetAndClose(client.getInternal(jobId, receivedKey, permanentBlob), testFile);
+ validateGetAndClose(client.getInternal(jobId, receivedKey), testFile);
+ if (blobType == TRANSIENT_BLOB) {
+ // transient BLOBs should be deleted from the server, eventually
+ verifyDeletedEventually(getBlobServer(), jobId, receivedKey);
+ }
} finally {
if (is != null) {
try {
@@ -366,17 +389,17 @@ public class BlobClientTest extends TestLogger {
@Test
public void testGetFailsDuringStreamingNoJobTransientBlob() throws IOException {
- testGetFailsDuringStreaming(null, false);
+ testGetFailsDuringStreaming(null, TRANSIENT_BLOB);
}
@Test
public void testGetFailsDuringStreamingForJobTransientBlob() throws IOException {
- testGetFailsDuringStreaming(new JobID(), false);
+ testGetFailsDuringStreaming(new JobID(), TRANSIENT_BLOB);
}
@Test
public void testGetFailsDuringStreamingForJobPermanentBlob() throws IOException {
- testGetFailsDuringStreaming(new JobID(), true);
+ testGetFailsDuringStreaming(new JobID(), PERMANENT_BLOB);
}
/**
@@ -384,10 +407,10 @@ public class BlobClientTest extends TestLogger {
*
* @param jobId
* job ID or <tt>null</tt> if job-unrelated
- * @param permanentBlob
- * whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
- private void testGetFailsDuringStreaming(@Nullable final JobID jobId, boolean permanentBlob)
+ private void testGetFailsDuringStreaming(@Nullable final JobID jobId, BlobKey.BlobType blobType)
throws IOException {
try (BlobClient client = new BlobClient(
@@ -398,11 +421,11 @@ public class BlobClientTest extends TestLogger {
rnd.nextBytes(data);
// put content addressable (like libraries)
- BlobKey key = client.putBuffer(jobId, data, 0, data.length, permanentBlob);
+ BlobKey key = client.putBuffer(jobId, data, 0, data.length, blobType);
assertNotNull(key);
// issue a GET request that succeeds
- InputStream is = client.getInternal(jobId, key, permanentBlob);
+ InputStream is = client.getInternal(jobId, key);
byte[] receiveBuffer = new byte[data.length];
int firstChunkLen = 50000;
@@ -440,7 +463,7 @@ public class BlobClientTest extends TestLogger {
static void uploadJarFile(BlobServer blobServer, Configuration blobClientConfig) throws Exception {
final File testFile = File.createTempFile("testfile", ".dat");
testFile.deleteOnExit();
- prepareTestFile(testFile);
+ prepareTestFile(testFile, PERMANENT_BLOB);
InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort());
@@ -452,13 +475,13 @@ public class BlobClientTest extends TestLogger {
final InetSocketAddress serverAddress, final Configuration blobClientConfig,
final File testFile) throws IOException {
JobID jobId = new JobID();
- List<BlobKey> blobKeys = BlobClient.uploadJarFiles(serverAddress, blobClientConfig,
+ List<PermanentBlobKey> blobKeys = BlobClient.uploadJarFiles(serverAddress, blobClientConfig,
jobId, Collections.singletonList(new Path(testFile.toURI())));
assertEquals(1, blobKeys.size());
try (BlobClient blobClient = new BlobClient(serverAddress, blobClientConfig)) {
- validateGetAndClose(blobClient.getInternal(jobId, blobKeys.get(0), true), testFile);
+ validateGetAndClose(blobClient.getInternal(jobId, blobKeys.get(0)), testFile);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
index 3797f87..49f4fc2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
@@ -25,7 +25,16 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -54,48 +63,119 @@ public final class BlobKeyTest extends TestLogger {
}
}
+ @Test
+ public void testCreateKey() {
+ BlobKey key = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1);
+ verifyType(PERMANENT_BLOB, key);
+ assertArrayEquals(KEY_ARRAY_1, key.getHash());
+
+ key = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1);
+ verifyType(TRANSIENT_BLOB, key);
+ assertArrayEquals(KEY_ARRAY_1, key.getHash());
+
+ }
+
+ @Test
+ public void testSerializationTransient() throws Exception {
+ testSerialization(TRANSIENT_BLOB);
+ }
+
+ @Test
+ public void testSerializationPermanent() throws Exception {
+ testSerialization(PERMANENT_BLOB);
+ }
+
/**
* Tests the serialization/deserialization of BLOB keys.
*/
- @Test
- public void testSerialization() throws Exception {
- final BlobKey k1 = new BlobKey(KEY_ARRAY_1);
+ private void testSerialization(BlobKey.BlobType blobType) throws Exception {
+ final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
final BlobKey k2 = CommonTestUtils.createCopySerializable(k1);
assertEquals(k1, k2);
assertEquals(k1.hashCode(), k2.hashCode());
assertEquals(0, k1.compareTo(k2));
}
+ @Test
+ public void testEqualsTransient() {
+ testEquals(TRANSIENT_BLOB);
+ }
+
+ @Test
+ public void testEqualsPermanent() {
+ testEquals(PERMANENT_BLOB);
+ }
+
/**
* Tests the equals method.
*/
- @Test
- public void testEquals() {
- final BlobKey k1 = new BlobKey(KEY_ARRAY_1);
- final BlobKey k2 = new BlobKey(KEY_ARRAY_1);
- final BlobKey k3 = new BlobKey(KEY_ARRAY_2);
+ private void testEquals(BlobKey.BlobType blobType) {
+ final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
+ final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1);
+ final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2);
assertTrue(k1.equals(k2));
+ assertTrue(k2.equals(k1));
assertFalse(k1.equals(k3));
+ assertFalse(k3.equals(k1));
+ }
+
+ /**
+ * Tests the equals method.
+ */
+ @Test
+ public void testEqualsDifferentBlobType() {
+ final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1);
+ final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1);
+ assertFalse(k1.equals(k2));
+ assertFalse(k2.equals(k1));
+ }
+
+ @Test
+ public void testComparesTransient() {
+ testCompares(TRANSIENT_BLOB);
+ }
+
+ @Test
+ public void testComparesPermanent() {
+ testCompares(PERMANENT_BLOB);
}
/**
* Tests the compares method.
*/
+ private void testCompares(BlobKey.BlobType blobType) {
+ final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
+ final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1);
+ final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2);
+ assertThat(k1.compareTo(k2), is(0));
+ assertThat(k2.compareTo(k1), is(0));
+ assertThat(k1.compareTo(k3), lessThan(0));
+ assertThat(k3.compareTo(k1), greaterThan(0));
+ }
+
+ @Test
+ public void testComparesDifferentBlobType() {
+ final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1);
+ final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1);
+ assertThat(k1.compareTo(k2), greaterThan(0));
+ assertThat(k2.compareTo(k1), lessThan(0));
+ }
+
+ @Test
+ public void testStreamsTransient() throws Exception {
+ testStreams(TRANSIENT_BLOB);
+ }
+
@Test
- public void testCompares() {
- final BlobKey k1 = new BlobKey(KEY_ARRAY_1);
- final BlobKey k2 = new BlobKey(KEY_ARRAY_1);
- final BlobKey k3 = new BlobKey(KEY_ARRAY_2);
- assertTrue(k1.compareTo(k2) == 0);
- assertTrue(k1.compareTo(k3) < 0);
+ public void testStreamsPermanent() throws Exception {
+ testStreams(PERMANENT_BLOB);
}
/**
* Test the serialization/deserialization using input/output streams.
*/
- @Test
- public void testStreams() throws Exception {
- final BlobKey k1 = new BlobKey(KEY_ARRAY_1);
+ private void testStreams(BlobKey.BlobType blobType) throws IOException {
+ final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
final ByteArrayOutputStream baos = new ByteArrayOutputStream(20);
k1.writeToOutputStream(baos);
@@ -106,4 +186,18 @@ public final class BlobKeyTest extends TestLogger {
assertEquals(k1, k2);
}
+
+ /**
+ * Verifies that the given <tt>key</tt> is of an expected type.
+ *
+ * @param expected the type the key should have
+ * @param key the key to verify
+ */
+ static void verifyType(BlobKey.BlobType expected, BlobKey key) {
+ if (expected == PERMANENT_BLOB) {
+ assertThat(key, is(instanceOf(PermanentBlobKey.class)));
+ } else {
+ assertThat(key, is(instanceOf(TransientBlobKey.class)));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
index c4c6762..56f193b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.util.TestLogger;
@@ -37,6 +36,7 @@ import java.nio.file.Files;
import java.util.Arrays;
import java.util.Random;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
import static org.junit.Assert.assertNotNull;
@@ -64,7 +64,6 @@ public class BlobServerCorruptionTest extends TestLogger {
final Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
- config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
@@ -108,7 +107,7 @@ public class BlobServerCorruptionTest extends TestLogger {
rnd.nextBytes(data);
// put content addressable (like libraries)
- BlobKey key = put(server, jobId, data, true);
+ BlobKey key = put(server, jobId, data, PERMANENT_BLOB);
assertNotNull(key);
// delete local file to make sure that the GET requests downloads from HA
@@ -131,7 +130,7 @@ public class BlobServerCorruptionTest extends TestLogger {
expectedException.expect(IOException.class);
expectedException.expectMessage("data corruption");
- get(server, jobId, key, true);
+ get(server, jobId, key);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index bb977d3..a110d4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -45,6 +45,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
@@ -95,6 +97,7 @@ public class BlobServerDeleteTest extends TestLogger {
*/
private void testDeleteTransient(@Nullable JobID jobId1, @Nullable JobID jobId2)
throws IOException {
+ final boolean sameJobId = (jobId1 == jobId2) || (jobId1 != null && jobId1.equals(jobId2));
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -109,35 +112,34 @@ public class BlobServerDeleteTest extends TestLogger {
data2[0] ^= 1;
// put first BLOB
- BlobKey key1 = put(server, jobId1, data, false);
+ TransientBlobKey key1 = (TransientBlobKey) put(server, jobId1, data, TRANSIENT_BLOB);
assertNotNull(key1);
// put two more BLOBs (same key, other key) for another job ID
- BlobKey key2a = put(server, jobId2, data, false);
+ TransientBlobKey key2a = (TransientBlobKey) put(server, jobId2, data, TRANSIENT_BLOB);
assertNotNull(key2a);
assertEquals(key1, key2a);
- BlobKey key2b = put(server, jobId2, data2, false);
+ TransientBlobKey key2b = (TransientBlobKey) put(server, jobId2, data2, TRANSIENT_BLOB);
assertNotNull(key2b);
// issue a DELETE request
assertTrue(delete(server, jobId1, key1));
- verifyDeleted(server, jobId1, key1, false);
+ verifyDeleted(server, jobId1, key1);
// deleting a one BLOB should not affect another BLOB, even with the same key if job IDs are different
- if ((jobId1 == null && jobId2 != null) || (jobId1 != null && !jobId1.equals(jobId2))) {
- verifyContents(server, jobId2, key2a, data, false);
+ if (!sameJobId) {
+ verifyContents(server, jobId2, key2a, data);
}
- verifyContents(server, jobId2, key2b, data2, false);
+ verifyContents(server, jobId2, key2b, data2);
// delete first file of second job
assertTrue(delete(server, jobId2, key2a));
- verifyDeleted(server, jobId2, key2a, false);
- verifyContents(server, jobId2, key2b, data2, false);
+ verifyDeleted(server, jobId2, key2a);
+ verifyContents(server, jobId2, key2b, data2);
// delete second file of second job
- assertTrue(delete(server, jobId2, key2a));
- verifyDeleted(server, jobId2, key2a, false);
- verifyContents(server, jobId2, key2b, data2, false);
+ assertTrue(delete(server, jobId2, key2b));
+ verifyDeleted(server, jobId2, key2b);
}
}
@@ -171,7 +173,7 @@ public class BlobServerDeleteTest extends TestLogger {
rnd.nextBytes(data);
// put BLOB
- BlobKey key = put(server, jobId, data, false);
+ TransientBlobKey key = (TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB);
assertNotNull(key);
File blobFile = server.getStorageLocation(jobId, key);
@@ -179,11 +181,11 @@ public class BlobServerDeleteTest extends TestLogger {
// DELETE operation should not fail if file is already deleted
assertTrue(delete(server, jobId, key));
- verifyDeleted(server, jobId, key, false);
+ verifyDeleted(server, jobId, key);
// one more delete call that should not fail
assertTrue(delete(server, jobId, key));
- verifyDeleted(server, jobId, key, false);
+ verifyDeleted(server, jobId, key);
}
}
@@ -223,7 +225,7 @@ public class BlobServerDeleteTest extends TestLogger {
rnd.nextBytes(data);
// put BLOB
- BlobKey key = put(server, jobId, data, false);
+ TransientBlobKey key = (TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB);
assertNotNull(key);
blobFile = server.getStorageLocation(jobId, key);
@@ -236,7 +238,7 @@ public class BlobServerDeleteTest extends TestLogger {
assertFalse(delete(server, jobId, key));
// the file should still be there
- verifyContents(server, jobId, key, data, false);
+ verifyContents(server, jobId, key, data);
} finally {
if (blobFile != null && directory != null) {
//noinspection ResultOfMethodCallIgnored
@@ -250,21 +252,21 @@ public class BlobServerDeleteTest extends TestLogger {
@Test
public void testJobCleanup() throws IOException, InterruptedException {
- testJobCleanup(false);
+ testJobCleanup(TRANSIENT_BLOB);
}
@Test
public void testJobCleanupHa() throws IOException, InterruptedException {
- testJobCleanup(true);
+ testJobCleanup(PERMANENT_BLOB);
}
/**
* Tests that {@link BlobServer} cleans up after calling {@link BlobServer#cleanupJob(JobID)}.
*
- * @param highAvailability
- * whether to use permanent (<tt>true</tt>) or transient BLOBs (<tt>false</tt>)
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
- private void testJobCleanup(boolean highAvailability) throws IOException {
+ private void testJobCleanup(BlobKey.BlobType blobType) throws IOException {
JobID jobId1 = new JobID();
JobID jobId2 = new JobID();
@@ -280,31 +282,31 @@ public class BlobServerDeleteTest extends TestLogger {
byte[] data2 = Arrays.copyOf(data, data.length);
data2[0] ^= 1;
- BlobKey key1a = put(server, jobId1, data, highAvailability);
- BlobKey key2 = put(server, jobId2, data, highAvailability);
+ BlobKey key1a = put(server, jobId1, data, blobType);
+ BlobKey key2 = put(server, jobId2, data, blobType);
assertEquals(key1a, key2);
- BlobKey key1b = put(server, jobId1, data2, highAvailability);
+ BlobKey key1b = put(server, jobId1, data2, blobType);
- verifyContents(server, jobId1, key1a, data, highAvailability);
- verifyContents(server, jobId1, key1b, data2, highAvailability);
+ verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1b, data2);
checkFileCountForJob(2, jobId1, server);
- verifyContents(server, jobId2, key2, data, highAvailability);
+ verifyContents(server, jobId2, key2, data);
checkFileCountForJob(1, jobId2, server);
server.cleanupJob(jobId1);
- verifyDeleted(server, jobId1, key1a, highAvailability);
- verifyDeleted(server, jobId1, key1b, highAvailability);
+ verifyDeleted(server, jobId1, key1a);
+ verifyDeleted(server, jobId1, key1b);
checkFileCountForJob(0, jobId1, server);
- verifyContents(server, jobId2, key2, data, highAvailability);
+ verifyContents(server, jobId2, key2, data);
checkFileCountForJob(1, jobId2, server);
server.cleanupJob(jobId2);
checkFileCountForJob(0, jobId1, server);
- verifyDeleted(server, jobId2, key2, highAvailability);
+ verifyDeleted(server, jobId2, key2);
checkFileCountForJob(0, jobId2, server);
// calling a second time should not fail
@@ -313,12 +315,14 @@ public class BlobServerDeleteTest extends TestLogger {
}
@Test
- public void testConcurrentDeleteOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
+ public void testConcurrentDeleteOperationsNoJobTransient()
+ throws IOException, ExecutionException, InterruptedException {
testConcurrentDeleteOperations(null);
}
@Test
- public void testConcurrentDeleteOperationsForJob() throws IOException, ExecutionException, InterruptedException {
+ public void testConcurrentDeleteOperationsForJobTransient()
+ throws IOException, ExecutionException, InterruptedException {
testConcurrentDeleteOperations(new JobID());
}
@@ -350,7 +354,8 @@ public class BlobServerDeleteTest extends TestLogger {
server.start();
- final BlobKey blobKey = put(server, jobId, data, false);
+ final TransientBlobKey blobKey =
+ (TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB);
assertTrue(server.getStorageLocation(jobId, blobKey).exists());
@@ -396,11 +401,11 @@ public class BlobServerDeleteTest extends TestLogger {
*
* @return delete success
*/
- static boolean delete(BlobService service, @Nullable JobID jobId, BlobKey key) {
+ static boolean delete(BlobService service, @Nullable JobID jobId, TransientBlobKey key) {
if (jobId == null) {
- return service.getTransientBlobStore().delete(key);
+ return service.getTransientBlobService().deleteFromCache(key);
} else {
- return service.getTransientBlobStore().delete(jobId, key);
+ return service.getTransientBlobService().deleteFromCache(jobId, key);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 0873aba..4927279 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.util.FlinkException;
@@ -57,6 +56,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
import static org.apache.flink.runtime.blob.BlobUtils.JOB_DIR_PREFIX;
import static org.junit.Assert.assertArrayEquals;
@@ -73,7 +74,8 @@ import static org.mockito.Mockito.mock;
* Tests how failing GET requests behave in the presence of failures when used with a {@link
* BlobServer}.
*
- * <p>Successful GET requests are tested in conjunction wit the PUT requests.
+ * <p>Successful GET requests are tested in conjunction wit the PUT requests by {@link
+ * BlobServerPutTest}.
*/
public class BlobServerGetTest extends TestLogger {
@@ -86,23 +88,23 @@ public class BlobServerGetTest extends TestLogger {
public final ExpectedException exception = ExpectedException.none();
@Test
- public void testGetFailsDuringLookup1() throws IOException {
- testGetFailsDuringLookup(null, new JobID(), false);
+ public void testGetTransientFailsDuringLookup1() throws IOException {
+ testGetFailsDuringLookup(null, new JobID(), TRANSIENT_BLOB);
}
@Test
- public void testGetFailsDuringLookup2() throws IOException {
- testGetFailsDuringLookup(new JobID(), new JobID(), false);
+ public void testGetTransientFailsDuringLookup2() throws IOException {
+ testGetFailsDuringLookup(new JobID(), new JobID(), TRANSIENT_BLOB);
}
@Test
- public void testGetFailsDuringLookup3() throws IOException {
- testGetFailsDuringLookup(new JobID(), null, false);
+ public void testGetTransientFailsDuringLookup3() throws IOException {
+ testGetFailsDuringLookup(new JobID(), null, TRANSIENT_BLOB);
}
@Test
- public void testGetFailsDuringLookupHa() throws IOException {
- testGetFailsDuringLookup(new JobID(), new JobID(), true);
+ public void testGetPermanentFailsDuringLookup() throws IOException {
+ testGetFailsDuringLookup(new JobID(), new JobID(), PERMANENT_BLOB);
}
/**
@@ -110,9 +112,11 @@ public class BlobServerGetTest extends TestLogger {
*
* @param jobId1 first job ID or <tt>null</tt> if job-unrelated
* @param jobId2 second job ID different to <tt>jobId1</tt>
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
private void testGetFailsDuringLookup(
- @Nullable final JobID jobId1, @Nullable final JobID jobId2, boolean highAvailability)
+ @Nullable final JobID jobId1, @Nullable final JobID jobId2, BlobKey.BlobType blobType)
throws IOException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -125,7 +129,7 @@ public class BlobServerGetTest extends TestLogger {
rnd.nextBytes(data);
// put content addressable (like libraries)
- BlobKey key = put(server, jobId1, data, highAvailability);
+ BlobKey key = put(server, jobId1, data, blobType);
assertNotNull(key);
// delete file to make sure that GET requests fail
@@ -133,22 +137,22 @@ public class BlobServerGetTest extends TestLogger {
assertTrue(blobFile.delete());
// issue a GET request that fails
- verifyDeleted(server, jobId1, key, highAvailability);
+ verifyDeleted(server, jobId1, key);
// add the same data under a second jobId
- BlobKey key2 = put(server, jobId2, data, highAvailability);
+ BlobKey key2 = put(server, jobId2, data, blobType);
assertNotNull(key);
assertEquals(key, key2);
// request for jobId2 should succeed
- get(server, jobId2, key, highAvailability);
+ get(server, jobId2, key);
// request for jobId1 should still fail
- verifyDeleted(server, jobId1, key, highAvailability);
+ verifyDeleted(server, jobId1, key);
// same checks as for jobId1 but for jobId2 should also work:
blobFile = server.getStorageLocation(jobId2, key);
assertTrue(blobFile.delete());
- verifyDeleted(server, jobId2, key, highAvailability);
+ verifyDeleted(server, jobId2, key);
}
}
@@ -164,7 +168,6 @@ public class BlobServerGetTest extends TestLogger {
final Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
- config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
@@ -181,7 +184,7 @@ public class BlobServerGetTest extends TestLogger {
// store the data on the server (and blobStore), remove from local store
byte[] data = new byte[2000000];
rnd.nextBytes(data);
- BlobKey blobKey = put(server, jobId, data, true);
+ BlobKey blobKey = put(server, jobId, data, PERMANENT_BLOB);
assertTrue(server.getStorageLocation(jobId, blobKey).delete());
// make sure the blob server cannot create any files in its storage dir
@@ -195,7 +198,7 @@ public class BlobServerGetTest extends TestLogger {
exception.expectMessage("Permission denied");
try {
- get(server, jobId, blobKey, true);
+ get(server, jobId, blobKey);
} finally {
HashSet<String> expectedDirs = new HashSet<>();
expectedDirs.add("incoming");
@@ -236,7 +239,6 @@ public class BlobServerGetTest extends TestLogger {
final Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
- config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
@@ -253,7 +255,7 @@ public class BlobServerGetTest extends TestLogger {
// store the data on the server (and blobStore), remove from local store
byte[] data = new byte[2000000];
rnd.nextBytes(data);
- BlobKey blobKey = put(server, jobId, data, true);
+ BlobKey blobKey = put(server, jobId, data, PERMANENT_BLOB);
assertTrue(server.getStorageLocation(jobId, blobKey).delete());
// make sure the blob cache cannot create any files in its storage dir
@@ -266,7 +268,7 @@ public class BlobServerGetTest extends TestLogger {
exception.expect(AccessDeniedException.class);
try {
- get(server, jobId, blobKey, true);
+ get(server, jobId, blobKey);
} finally {
// there should be no remaining incoming files
File incomingFileDir = new File(jobStoreDir.getParent(), "incoming");
@@ -307,7 +309,7 @@ public class BlobServerGetTest extends TestLogger {
// store the data on the server (and blobStore), remove from local store
byte[] data = new byte[2000000];
rnd.nextBytes(data);
- BlobKey blobKey = put(server, jobId, data, true);
+ BlobKey blobKey = put(server, jobId, data, PERMANENT_BLOB);
assertTrue(server.getStorageLocation(jobId, blobKey).delete());
File tempFileDir = server.createTemporaryFilename().getParentFile();
@@ -316,7 +318,7 @@ public class BlobServerGetTest extends TestLogger {
exception.expect(NoSuchFileException.class);
try {
- get(server, jobId, blobKey, true);
+ get(server, jobId, blobKey);
} finally {
HashSet<String> expectedDirs = new HashSet<>();
expectedDirs.add("incoming");
@@ -336,17 +338,17 @@ public class BlobServerGetTest extends TestLogger {
@Test
public void testConcurrentGetOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
- testConcurrentGetOperations(null, false);
+ testConcurrentGetOperations(null, TRANSIENT_BLOB);
}
@Test
public void testConcurrentGetOperationsForJob() throws IOException, ExecutionException, InterruptedException {
- testConcurrentGetOperations(new JobID(), false);
+ testConcurrentGetOperations(new JobID(), TRANSIENT_BLOB);
}
@Test
public void testConcurrentGetOperationsForJobHa() throws IOException, ExecutionException, InterruptedException {
- testConcurrentGetOperations(new JobID(), true);
+ testConcurrentGetOperations(new JobID(), PERMANENT_BLOB);
}
/**
@@ -355,11 +357,11 @@ public class BlobServerGetTest extends TestLogger {
*
* @param jobId
* job ID to use (or <tt>null</tt> if job-unrelated)
- * @param highAvailability
- * whether to use permanent (<tt>true</tt>) or transient BLOBs (<tt>false</tt>)
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
private void testConcurrentGetOperations(
- @Nullable final JobID jobId, final boolean highAvailability)
+ @Nullable final JobID jobId, final BlobKey.BlobType blobType)
throws IOException, InterruptedException, ExecutionException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -374,7 +376,7 @@ public class BlobServerGetTest extends TestLogger {
MessageDigest md = BlobUtils.createMessageDigest();
// create the correct blob key by hashing our input data
- final BlobKey blobKey = new BlobKey(md.digest(data));
+ final BlobKey blobKey = BlobKey.createKey(blobType, md.digest(data));
doAnswer(
new Answer() {
@@ -396,10 +398,10 @@ public class BlobServerGetTest extends TestLogger {
server.start();
// upload data first
- assertEquals(blobKey, put(server, jobId, data, highAvailability));
+ assertEquals(blobKey, put(server, jobId, data, blobType));
// now try accessing it concurrently (only HA mode will be able to retrieve it from HA store!)
- if (highAvailability) {
+ if (blobType == PERMANENT_BLOB) {
// remove local copy so that a transfer from HA store takes place
assertTrue(server.getStorageLocation(jobId, blobKey).delete());
}
@@ -407,7 +409,7 @@ public class BlobServerGetTest extends TestLogger {
CompletableFuture<File> getOperation = CompletableFuture.supplyAsync(
() -> {
try {
- File file = get(server, jobId, blobKey, highAvailability);
+ File file = get(server, jobId, blobKey);
// check that we have read the right data
validateGetAndClose(new FileInputStream(file), data);
return file;
@@ -431,8 +433,8 @@ public class BlobServerGetTest extends TestLogger {
/**
* Retrieves the given blob.
*
- * <p>Note that if a {@link BlobCache} is used, it may try to access the {@link BlobServer} to
- * retrieve the blob.
+ * <p>Note that if a {@link BlobCacheService} is used, it may try to access the {@link
+ * BlobServer} to retrieve the blob.
*
* @param service
* BLOB client to use for connecting to the BLOB service
@@ -440,26 +442,22 @@ public class BlobServerGetTest extends TestLogger {
* job ID or <tt>null</tt> if job-unrelated
* @param key
* key identifying the BLOB to request
- * @param highAvailability
- * whether to check HA mode accessors
*/
- static File get(
- BlobService service, @Nullable JobID jobId, BlobKey key, boolean highAvailability)
- throws IOException {
- if (highAvailability) {
- return service.getPermanentBlobStore().getHAFile(jobId, key);
+ static File get(BlobService service, @Nullable JobID jobId, BlobKey key) throws IOException {
+ if (key instanceof PermanentBlobKey) {
+ return service.getPermanentBlobService().getFile(jobId, (PermanentBlobKey) key);
} else if (jobId == null) {
- return service.getTransientBlobStore().getFile(key);
+ return service.getTransientBlobService().getFile((TransientBlobKey) key);
} else {
- return service.getTransientBlobStore().getFile(jobId, key);
+ return service.getTransientBlobService().getFile(jobId, (TransientBlobKey) key);
}
}
/**
* Checks that the given blob does not exist anymore by trying to access it.
*
- * <p>Note that if a {@link BlobCache} is used, it may try to access the {@link BlobServer} to
- * retrieve the blob.
+ * <p>Note that if a {@link BlobCacheService} is used, it may try to access the {@link
+ * BlobServer} to retrieve the blob.
*
* @param service
* BLOB client to use for connecting to the BLOB service
@@ -467,14 +465,11 @@ public class BlobServerGetTest extends TestLogger {
* job ID or <tt>null</tt> if job-unrelated
* @param key
* key identifying the BLOB to request
- * @param highAvailability
- * whether to check HA mode accessors
*/
- static void verifyDeleted(
- BlobService service, @Nullable JobID jobId, BlobKey key, boolean highAvailability)
+ static void verifyDeleted(BlobService service, @Nullable JobID jobId, BlobKey key)
throws IOException {
try {
- get(service, jobId, key, highAvailability);
+ get(service, jobId, key);
fail("File " + jobId + "/" + key + " should have been deleted.");
} catch (IOException e) {
// expected
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index 6b38f43..aefd0a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -60,6 +60,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -134,11 +136,15 @@ public class BlobServerPutTest extends TestLogger {
server.start();
- BlobKey key = new BlobKey();
+ BlobKey key1 = new TransientBlobKey();
+ BlobKey key2 = new PermanentBlobKey();
CheckedThread[] threads = new CheckedThread[] {
- new ContentAddressableGetStorageLocation(server, jobId, key),
- new ContentAddressableGetStorageLocation(server, jobId, key),
- new ContentAddressableGetStorageLocation(server, jobId, key)
+ new ContentAddressableGetStorageLocation(server, jobId, key1),
+ new ContentAddressableGetStorageLocation(server, jobId, key1),
+ new ContentAddressableGetStorageLocation(server, jobId, key1),
+ new ContentAddressableGetStorageLocation(server, jobId, key2),
+ new ContentAddressableGetStorageLocation(server, jobId, key2),
+ new ContentAddressableGetStorageLocation(server, jobId, key2)
};
checkedThreadSimpleTest(threads);
}
@@ -168,27 +174,27 @@ public class BlobServerPutTest extends TestLogger {
@Test
public void testPutBufferSuccessfulGet1() throws IOException {
- testPutBufferSuccessfulGet(null, null, false);
+ testPutBufferSuccessfulGet(null, null, TRANSIENT_BLOB);
}
@Test
public void testPutBufferSuccessfulGet2() throws IOException {
- testPutBufferSuccessfulGet(null, new JobID(), false);
+ testPutBufferSuccessfulGet(null, new JobID(), TRANSIENT_BLOB);
}
@Test
public void testPutBufferSuccessfulGet3() throws IOException {
- testPutBufferSuccessfulGet(new JobID(), new JobID(), false);
+ testPutBufferSuccessfulGet(new JobID(), new JobID(), TRANSIENT_BLOB);
}
@Test
public void testPutBufferSuccessfulGet4() throws IOException {
- testPutBufferSuccessfulGet(new JobID(), null, false);
+ testPutBufferSuccessfulGet(new JobID(), null, TRANSIENT_BLOB);
}
@Test
public void testPutBufferSuccessfulGetHa() throws IOException {
- testPutBufferSuccessfulGet(new JobID(), new JobID(), true);
+ testPutBufferSuccessfulGet(new JobID(), new JobID(), PERMANENT_BLOB);
}
/**
@@ -199,11 +205,11 @@ public class BlobServerPutTest extends TestLogger {
* first job id
* @param jobId2
* second job id
- * @param highAvailability
- * whether to upload a permanent blob (<tt>true</tt>) or not
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
private void testPutBufferSuccessfulGet(
- @Nullable JobID jobId1, @Nullable JobID jobId2, boolean highAvailability)
+ @Nullable JobID jobId1, @Nullable JobID jobId2, BlobKey.BlobType blobType)
throws IOException {
final Configuration config = new Configuration();
@@ -218,29 +224,34 @@ public class BlobServerPutTest extends TestLogger {
byte[] data2 = Arrays.copyOfRange(data, 10, 54);
// put data for jobId1 and verify
- BlobKey key1a = put(server, jobId1, data, highAvailability);
+ BlobKey key1a = put(server, jobId1, data, blobType);
assertNotNull(key1a);
- BlobKey key1b = put(server, jobId1, data2, highAvailability);
+ BlobKey key1b = put(server, jobId1, data2, blobType);
assertNotNull(key1b);
- verifyContents(server, jobId1, key1a, data, highAvailability);
- verifyContents(server, jobId1, key1b, data2, highAvailability);
+ verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1b, data2);
// now put data for jobId2 and verify that both are ok
- BlobKey key2a = put(server, jobId2, data, highAvailability);
+ BlobKey key2a = put(server, jobId2, data, blobType);
assertNotNull(key2a);
assertEquals(key1a, key2a);
- BlobKey key2b = put(server, jobId2, data2, highAvailability);
+ BlobKey key2b = put(server, jobId2, data2, blobType);
assertNotNull(key2b);
assertEquals(key1b, key2b);
// verify the accessibility and the BLOB contents
- verifyContents(server, jobId1, key1a, data, highAvailability);
- verifyContents(server, jobId1, key1b, data2, highAvailability);
- verifyContents(server, jobId2, key2a, data, highAvailability);
- verifyContents(server, jobId2, key2b, data2, highAvailability);
+ verifyContents(server, jobId2, key2a, data);
+ verifyContents(server, jobId2, key2b, data2);
+
+ // verify the accessibility and the BLOB contents one more time (transient BLOBs should
+ // not be deleted here)
+ verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1b, data2);
+ verifyContents(server, jobId2, key2a, data);
+ verifyContents(server, jobId2, key2b, data2);
}
}
@@ -248,27 +259,27 @@ public class BlobServerPutTest extends TestLogger {
@Test
public void testPutStreamSuccessfulGet1() throws IOException {
- testPutStreamSuccessfulGet(null, null, false);
+ testPutStreamSuccessfulGet(null, null, TRANSIENT_BLOB);
}
@Test
public void testPutStreamSuccessfulGet2() throws IOException {
- testPutStreamSuccessfulGet(null, new JobID(), false);
+ testPutStreamSuccessfulGet(null, new JobID(), TRANSIENT_BLOB);
}
@Test
public void testPutStreamSuccessfulGet3() throws IOException {
- testPutStreamSuccessfulGet(new JobID(), new JobID(), false);
+ testPutStreamSuccessfulGet(new JobID(), new JobID(), TRANSIENT_BLOB);
}
@Test
public void testPutStreamSuccessfulGet4() throws IOException {
- testPutStreamSuccessfulGet(new JobID(), null, false);
+ testPutStreamSuccessfulGet(new JobID(), null, TRANSIENT_BLOB);
}
@Test
public void testPutStreamSuccessfulGetHa() throws IOException {
- testPutStreamSuccessfulGet(new JobID(), new JobID(), true);
+ testPutStreamSuccessfulGet(new JobID(), new JobID(), PERMANENT_BLOB);
}
/**
@@ -279,11 +290,11 @@ public class BlobServerPutTest extends TestLogger {
* first job id
* @param jobId2
* second job id
- * @param highAvailability
- * whether to upload a permanent blob (<tt>true</tt>) or not
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
private void testPutStreamSuccessfulGet(
- @Nullable JobID jobId1, @Nullable JobID jobId2, boolean highAvailability)
+ @Nullable JobID jobId1, @Nullable JobID jobId2, BlobKey.BlobType blobType)
throws IOException {
final Configuration config = new Configuration();
@@ -298,29 +309,34 @@ public class BlobServerPutTest extends TestLogger {
byte[] data2 = Arrays.copyOfRange(data, 10, 54);
// put data for jobId1 and verify
- BlobKey key1a = put(server, jobId1, new ByteArrayInputStream(data), highAvailability);
+ BlobKey key1a = put(server, jobId1, new ByteArrayInputStream(data), blobType);
assertNotNull(key1a);
- BlobKey key1b = put(server, jobId1, new ByteArrayInputStream(data2), highAvailability);
+ BlobKey key1b = put(server, jobId1, new ByteArrayInputStream(data2), blobType);
assertNotNull(key1b);
- verifyContents(server, jobId1, key1a, data, highAvailability);
- verifyContents(server, jobId1, key1b, data2, highAvailability);
+ verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1b, data2);
// now put data for jobId2 and verify that both are ok
- BlobKey key2a = put(server, jobId2, new ByteArrayInputStream(data), highAvailability);
+ BlobKey key2a = put(server, jobId2, new ByteArrayInputStream(data), blobType);
assertNotNull(key2a);
assertEquals(key1a, key2a);
- BlobKey key2b = put(server, jobId2, new ByteArrayInputStream(data2), highAvailability);
+ BlobKey key2b = put(server, jobId2, new ByteArrayInputStream(data2), blobType);
assertNotNull(key2b);
assertEquals(key1b, key2b);
// verify the accessibility and the BLOB contents
- verifyContents(server, jobId1, key1a, data, highAvailability);
- verifyContents(server, jobId1, key1b, data2, highAvailability);
- verifyContents(server, jobId2, key2a, data, highAvailability);
- verifyContents(server, jobId2, key2b, data2, highAvailability);
+ verifyContents(server, jobId2, key2a, data);
+ verifyContents(server, jobId2, key2b, data2);
+
+ // verify the accessibility and the BLOB contents one more time (transient BLOBs should
+ // not be deleted here)
+ verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1b, data2);
+ verifyContents(server, jobId2, key2a, data);
+ verifyContents(server, jobId2, key2b, data2);
}
}
@@ -328,27 +344,27 @@ public class BlobServerPutTest extends TestLogger {
@Test
public void testPutChunkedStreamSuccessfulGet1() throws IOException {
- testPutChunkedStreamSuccessfulGet(null, null, false);
+ testPutChunkedStreamSuccessfulGet(null, null, TRANSIENT_BLOB);
}
@Test
public void testPutChunkedStreamSuccessfulGet2() throws IOException {
- testPutChunkedStreamSuccessfulGet(null, new JobID(), false);
+ testPutChunkedStreamSuccessfulGet(null, new JobID(), TRANSIENT_BLOB);
}
@Test
public void testPutChunkedStreamSuccessfulGet3() throws IOException {
- testPutChunkedStreamSuccessfulGet(new JobID(), new JobID(), false);
+ testPutChunkedStreamSuccessfulGet(new JobID(), new JobID(), TRANSIENT_BLOB);
}
@Test
public void testPutChunkedStreamSuccessfulGet4() throws IOException {
- testPutChunkedStreamSuccessfulGet(new JobID(), null, false);
+ testPutChunkedStreamSuccessfulGet(new JobID(), null, TRANSIENT_BLOB);
}
@Test
public void testPutChunkedStreamSuccessfulGetHa() throws IOException {
- testPutChunkedStreamSuccessfulGet(new JobID(), new JobID(), true);
+ testPutChunkedStreamSuccessfulGet(new JobID(), new JobID(), PERMANENT_BLOB);
}
/**
@@ -359,11 +375,11 @@ public class BlobServerPutTest extends TestLogger {
* first job id
* @param jobId2
* second job id
- * @param highAvailability
- * whether to upload a permanent blob (<tt>true</tt>) or not
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
private void testPutChunkedStreamSuccessfulGet(
- @Nullable JobID jobId1, @Nullable JobID jobId2, boolean highAvailability)
+ @Nullable JobID jobId1, @Nullable JobID jobId2, BlobKey.BlobType blobType)
throws IOException {
final Configuration config = new Configuration();
@@ -378,29 +394,34 @@ public class BlobServerPutTest extends TestLogger {
byte[] data2 = Arrays.copyOfRange(data, 10, 54);
// put data for jobId1 and verify
- BlobKey key1a = put(server, jobId1, new ChunkedInputStream(data, 19), highAvailability);
+ BlobKey key1a = put(server, jobId1, new ChunkedInputStream(data, 19), blobType);
assertNotNull(key1a);
- BlobKey key1b = put(server, jobId1, new ChunkedInputStream(data2, 19), highAvailability);
+ BlobKey key1b = put(server, jobId1, new ChunkedInputStream(data2, 19), blobType);
assertNotNull(key1b);
- verifyContents(server, jobId1, key1a, data, highAvailability);
- verifyContents(server, jobId1, key1b, data2, highAvailability);
+ verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1b, data2);
// now put data for jobId2 and verify that both are ok
- BlobKey key2a = put(server, jobId2, new ChunkedInputStream(data, 19), highAvailability);
+ BlobKey key2a = put(server, jobId2, new ChunkedInputStream(data, 19), blobType);
assertNotNull(key2a);
assertEquals(key1a, key2a);
- BlobKey key2b = put(server, jobId2, new ChunkedInputStream(data2, 19), highAvailability);
+ BlobKey key2b = put(server, jobId2, new ChunkedInputStream(data2, 19), blobType);
assertNotNull(key2b);
assertEquals(key1b, key2b);
// verify the accessibility and the BLOB contents
- verifyContents(server, jobId1, key1a, data, highAvailability);
- verifyContents(server, jobId1, key1b, data2, highAvailability);
- verifyContents(server, jobId2, key2a, data, highAvailability);
- verifyContents(server, jobId2, key2b, data2, highAvailability);
+ verifyContents(server, jobId2, key2a, data);
+ verifyContents(server, jobId2, key2b, data2);
+
+ // verify the accessibility and the BLOB contents one more time (transient BLOBs should
+ // not be deleted here)
+ verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1b, data2);
+ verifyContents(server, jobId2, key2a, data);
+ verifyContents(server, jobId2, key2b, data2);
}
}
@@ -408,17 +429,17 @@ public class BlobServerPutTest extends TestLogger {
@Test
public void testPutBufferFailsNoJob() throws IOException {
- testPutBufferFails(null, false);
+ testPutBufferFails(null, TRANSIENT_BLOB);
}
@Test
public void testPutBufferFailsForJob() throws IOException {
- testPutBufferFails(new JobID(), false);
+ testPutBufferFails(new JobID(), TRANSIENT_BLOB);
}
@Test
public void testPutBufferFailsForJobHa() throws IOException {
- testPutBufferFails(new JobID(), true);
+ testPutBufferFails(new JobID(), PERMANENT_BLOB);
}
/**
@@ -427,10 +448,10 @@ public class BlobServerPutTest extends TestLogger {
*
* @param jobId
* job id
- * @param highAvailability
- * whether to upload a permanent blob (<tt>true</tt>) or not
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
- private void testPutBufferFails(@Nullable final JobID jobId, boolean highAvailability)
+ private void testPutBufferFails(@Nullable final JobID jobId, BlobKey.BlobType blobType)
throws IOException {
assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
@@ -455,7 +476,7 @@ public class BlobServerPutTest extends TestLogger {
exception.expect(IOException.class);
exception.expectMessage("Cannot create directory ");
- put(server, jobId, data, highAvailability);
+ put(server, jobId, data, blobType);
} finally {
// set writable again to make sure we can remove the directory
@@ -468,17 +489,17 @@ public class BlobServerPutTest extends TestLogger {
@Test
public void testPutBufferFailsIncomingNoJob() throws IOException {
- testPutBufferFailsIncoming(null, false);
+ testPutBufferFailsIncoming(null, TRANSIENT_BLOB);
}
@Test
public void testPutBufferFailsIncomingForJob() throws IOException {
- testPutBufferFailsIncoming(new JobID(), false);
+ testPutBufferFailsIncoming(new JobID(), TRANSIENT_BLOB);
}
@Test
public void testPutBufferFailsIncomingForJobHa() throws IOException {
- testPutBufferFailsIncoming(new JobID(), true);
+ testPutBufferFailsIncoming(new JobID(), PERMANENT_BLOB);
}
/**
@@ -487,10 +508,10 @@ public class BlobServerPutTest extends TestLogger {
*
* @param jobId
* job id
- * @param highAvailability
- * whether to upload a permanent blob (<tt>true</tt>) or not
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
- private void testPutBufferFailsIncoming(@Nullable final JobID jobId, boolean highAvailability)
+ private void testPutBufferFailsIncoming(@Nullable final JobID jobId, BlobKey.BlobType blobType)
throws IOException {
assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
@@ -516,7 +537,7 @@ public class BlobServerPutTest extends TestLogger {
exception.expectMessage(" (Permission denied)");
try {
- put(server, jobId, data, highAvailability);
+ put(server, jobId, data, blobType);
} finally {
File storageDir = tempFileDir.getParentFile();
// only the incoming directory should exist (no job directory!)
@@ -533,17 +554,17 @@ public class BlobServerPutTest extends TestLogger {
@Test
public void testPutBufferFailsStoreNoJob() throws IOException {
- testPutBufferFailsStore(null, false);
+ testPutBufferFailsStore(null, TRANSIENT_BLOB);
}
@Test
public void testPutBufferFailsStoreForJob() throws IOException {
- testPutBufferFailsStore(new JobID(), false);
+ testPutBufferFailsStore(new JobID(), TRANSIENT_BLOB);
}
@Test
public void testPutBufferFailsStoreForJobHa() throws IOException {
- testPutBufferFailsStore(new JobID(), true);
+ testPutBufferFailsStore(new JobID(), PERMANENT_BLOB);
}
/**
@@ -552,10 +573,10 @@ public class BlobServerPutTest extends TestLogger {
*
* @param jobId
* job id
- * @param highAvailability
- * whether to upload a permanent blob (<tt>true</tt>) or not
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
- private void testPutBufferFailsStore(@Nullable final JobID jobId, boolean highAvailability)
+ private void testPutBufferFailsStore(@Nullable final JobID jobId, BlobKey.BlobType blobType)
throws IOException {
assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
@@ -568,7 +589,8 @@ public class BlobServerPutTest extends TestLogger {
server.start();
// make sure the blob server cannot create any files in its storage dir
- jobStoreDir = server.getStorageLocation(jobId, new BlobKey()).getParentFile();
+ jobStoreDir = server.getStorageLocation(jobId,
+ BlobKey.createKey(blobType)).getParentFile();
assertTrue(jobStoreDir.setExecutable(true, false));
assertTrue(jobStoreDir.setReadable(true, false));
assertTrue(jobStoreDir.setWritable(false, false));
@@ -580,7 +602,7 @@ public class BlobServerPutTest extends TestLogger {
exception.expect(AccessDeniedException.class);
try {
- put(server, jobId, data, highAvailability);
+ put(server, jobId, data, blobType);
} finally {
// there should be no remaining incoming files
File incomingFileDir = new File(jobStoreDir.getParent(), "incoming");
@@ -600,17 +622,17 @@ public class BlobServerPutTest extends TestLogger {
@Test
public void testConcurrentPutOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
- testConcurrentPutOperations(null, false);
+ testConcurrentPutOperations(null, TRANSIENT_BLOB);
}
@Test
public void testConcurrentPutOperationsForJob() throws IOException, ExecutionException, InterruptedException {
- testConcurrentPutOperations(new JobID(), false);
+ testConcurrentPutOperations(new JobID(), TRANSIENT_BLOB);
}
@Test
public void testConcurrentPutOperationsForJobHa() throws IOException, ExecutionException, InterruptedException {
- testConcurrentPutOperations(new JobID(), true);
+ testConcurrentPutOperations(new JobID(), PERMANENT_BLOB);
}
/**
@@ -620,11 +642,11 @@ public class BlobServerPutTest extends TestLogger {
*
* @param jobId
* job ID to use (or <tt>null</tt> if job-unrelated)
- * @param highAvailability
- * whether to use permanent (<tt>true</tt>) or transient BLOBs (<tt>false</tt>)
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*/
private void testConcurrentPutOperations(
- @Nullable final JobID jobId, final boolean highAvailability)
+ @Nullable final JobID jobId, final BlobKey.BlobType blobType)
throws IOException, InterruptedException, ExecutionException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -651,9 +673,9 @@ public class BlobServerPutTest extends TestLogger {
try {
BlockingInputStream inputStream =
new BlockingInputStream(countDownLatch, data);
- BlobKey uploadedKey = put(server, jobId, inputStream, highAvailability);
+ BlobKey uploadedKey = put(server, jobId, inputStream, blobType);
// check the uploaded file's contents (concurrently)
- verifyContents(server, jobId, uploadedKey, data, highAvailability);
+ verifyContents(server, jobId, uploadedKey, data);
return uploadedKey;
} catch (IOException e) {
throw new CompletionException(new FlinkException(
@@ -682,10 +704,10 @@ public class BlobServerPutTest extends TestLogger {
}
// check the uploaded file's contents
- verifyContents(server, jobId, blobKey, data, highAvailability);
+ verifyContents(server, jobId, blobKey, data);
// check that we only uploaded the file once to the blob store
- if (highAvailability) {
+ if (blobType == PERMANENT_BLOB) {
verify(blobStore, times(1)).put(any(File.class), eq(jobId), eq(blobKey));
} else {
// can't really verify much in the other cases other than that the put operations should
@@ -700,35 +722,41 @@ public class BlobServerPutTest extends TestLogger {
// --------------------------------------------------------------------------------------------
/**
- * Helper to choose the right {@link BlobServer#put} method.
+ * Helper to choose the right {@link BlobServer#putTransient} method.
+ *
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*
* @return blob key for the uploaded data
*/
- static BlobKey put(BlobService service, @Nullable JobID jobId, InputStream data, boolean highAvailability)
+ static BlobKey put(BlobService service, @Nullable JobID jobId, InputStream data, BlobKey.BlobType blobType)
throws IOException {
- if (highAvailability) {
+ if (blobType == PERMANENT_BLOB) {
if (service instanceof BlobServer) {
- return ((BlobServer) service).putHA(jobId, data);
+ return ((BlobServer) service).putPermanent(jobId, data);
} else {
throw new UnsupportedOperationException("uploading streams is only possible at the BlobServer");
}
} else if (jobId == null) {
- return service.getTransientBlobStore().put(data);
+ return service.getTransientBlobService().putTransient(data);
} else {
- return service.getTransientBlobStore().put(jobId, data);
+ return service.getTransientBlobService().putTransient(jobId, data);
}
}
/**
- * Helper to choose the right {@link BlobServer#put} method.
+ * Helper to choose the right {@link BlobServer#putTransient} method.
+ *
+ * @param blobType
+ * whether the BLOB should become permanent or transient
*
* @return blob key for the uploaded data
*/
- static BlobKey put(BlobService service, @Nullable JobID jobId, byte[] data, boolean highAvailability)
+ static BlobKey put(BlobService service, @Nullable JobID jobId, byte[] data, BlobKey.BlobType blobType)
throws IOException {
- if (highAvailability) {
+ if (blobType == PERMANENT_BLOB) {
if (service instanceof BlobServer) {
- return ((BlobServer) service).putHA(jobId, data);
+ return ((BlobServer) service).putPermanent(jobId, data);
} else {
// implement via JAR file upload instead:
File tmpFile = Files.createTempFile("blob", ".jar").toFile();
@@ -738,7 +766,7 @@ public class BlobServerPutTest extends TestLogger {
// uploading HA BLOBs works on BlobServer only (and, for now, via the BlobClient)
Configuration clientConfig = new Configuration();
List<Path> jars = Collections.singletonList(new Path(tmpFile.getAbsolutePath()));
- List<BlobKey> keys = BlobClient.uploadJarFiles(serverAddress, clientConfig, jobId, jars);
+ List<PermanentBlobKey> keys = BlobClient.uploadJarFiles(serverAddress, clientConfig, jobId, jars);
assertEquals(1, keys.size());
return keys.get(0);
} finally {
@@ -747,9 +775,9 @@ public class BlobServerPutTest extends TestLogger {
}
}
} else if (jobId == null) {
- return service.getTransientBlobStore().put(data);
+ return service.getTransientBlobService().putTransient(data);
} else {
- return service.getTransientBlobStore().put(jobId, data);
+ return service.getTransientBlobService().putTransient(jobId, data);
}
}
@@ -764,14 +792,12 @@ public class BlobServerPutTest extends TestLogger {
* blob key
* @param data
* expected data
- * @param highAvailability
- * whether to use HA mode accessors
*/
static void verifyContents(
- BlobService blobService, @Nullable JobID jobId, BlobKey key, byte[] data, boolean highAvailability)
+ BlobService blobService, @Nullable JobID jobId, BlobKey key, byte[] data)
throws IOException {
- File file = get(blobService, jobId, key, highAvailability);
+ File file = get(blobService, jobId, key);
validateGetAndClose(new FileInputStream(file), data);
}
@@ -786,14 +812,12 @@ public class BlobServerPutTest extends TestLogger {
* blob key
* @param data
* expected data
- * @param highAvailability
- * whether to use HA mode accessors
*/
static void verifyContents(
- BlobService blobService, @Nullable JobID jobId, BlobKey key, InputStream data,
- boolean highAvailability) throws IOException {
+ BlobService blobService, @Nullable JobID jobId, BlobKey key, InputStream data)
+ throws IOException {
- File file = get(blobService, jobId, key, highAvailability);
+ File file = get(blobService, jobId, key);
validateGetAndClose(new FileInputStream(file), data);
}