You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/05 14:06:58 UTC
[05/14] flink git commit: [FLINK-7068][blob] change BlobService
sub-classes for permanent and transient BLOBs
http://git-wip-us.apache.org/repos/asf/flink/blob/071e27f7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index 790514c..664dc28 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -62,6 +63,7 @@ public class BlobClientSslTest extends BlobClientTest {
config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore());
+ BLOB_SSL_SERVER.start();
sslClientConfig = new Configuration();
sslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
@@ -80,6 +82,7 @@ public class BlobClientSslTest extends BlobClientTest {
config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
BLOB_NON_SSL_SERVER = new BlobServer(config, new VoidBlobStore());
+ BLOB_NON_SSL_SERVER.start();
nonSslClientConfig = new Configuration();
nonSslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
@@ -110,7 +113,7 @@ public class BlobClientSslTest extends BlobClientTest {
}
/**
- * Verify ssl client to ssl server upload
+ * Verify ssl client to ssl server upload.
*/
@Test
public void testUploadJarFilesHelper() throws Exception {
@@ -118,7 +121,7 @@ public class BlobClientSslTest extends BlobClientTest {
}
/**
- * Verify ssl client to non-ssl server failure
+ * Verify ssl client to non-ssl server failure.
*/
@Test(expected = IOException.class)
public void testSSLClientFailure() throws Exception {
@@ -127,7 +130,7 @@ public class BlobClientSslTest extends BlobClientTest {
}
/**
- * Verify ssl client to non-ssl server failure
+ * Verify ssl client to non-ssl server failure.
*/
@Test(expected = IOException.class)
public void testSSLClientFailure2() throws Exception {
@@ -136,7 +139,7 @@ public class BlobClientSslTest extends BlobClientTest {
}
/**
- * Verify non-ssl client to ssl server failure
+ * Verify non-ssl client to ssl server failure.
*/
@Test(expected = IOException.class)
public void testSSLServerFailure() throws Exception {
@@ -145,7 +148,7 @@ public class BlobClientSslTest extends BlobClientTest {
}
/**
- * Verify non-ssl client to ssl server failure
+ * Verify non-ssl client to ssl server failure.
*/
@Test(expected = IOException.class)
public void testSSLServerFailure2() throws Exception {
@@ -154,7 +157,7 @@ public class BlobClientSslTest extends BlobClientTest {
}
/**
- * Verify non-ssl connection sanity
+ * Verify non-ssl connection sanity.
*/
@Test
public void testNonSSLConnection() throws Exception {
@@ -162,7 +165,7 @@ public class BlobClientSslTest extends BlobClientTest {
}
/**
- * Verify non-ssl connection sanity
+ * Verify non-ssl connection sanity.
*/
@Test
public void testNonSSLConnection2() throws Exception {
@@ -170,7 +173,7 @@ public class BlobClientSslTest extends BlobClientTest {
}
/**
- * Verify non-ssl connection sanity
+ * Verify non-ssl connection sanity.
*/
@Test
public void testNonSSLConnection3() throws Exception {
@@ -178,7 +181,7 @@ public class BlobClientSslTest extends BlobClientTest {
}
/**
- * Verify non-ssl connection sanity
+ * Verify non-ssl connection sanity.
*/
@Test
public void testNonSSLConnection4() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/071e27f7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 6d6bfd5..c4444c8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -18,15 +18,20 @@
package org.apache.flink.runtime.blob;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import javax.annotation.Nullable;
+
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
@@ -37,12 +42,11 @@ import java.net.InetSocketAddress;
import java.security.MessageDigest;
import java.util.Collections;
import java.util.List;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.util.TestLogger;
+import java.util.Random;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
/**
@@ -56,7 +60,7 @@ public class BlobClientTest extends TestLogger {
/** The instance of the (non-ssl) BLOB server used during the tests. */
static BlobServer BLOB_SERVER;
- /** The blob service (non-ssl) client configuration */
+ /** The blob service (non-ssl) client configuration. */
static Configuration clientConfig;
@ClassRule
@@ -72,6 +76,7 @@ public class BlobClientTest extends TestLogger {
temporaryFolder.newFolder().getAbsolutePath());
BLOB_SERVER = new BlobServer(config, new VoidBlobStore());
+ BLOB_SERVER.start();
clientConfig = new Configuration();
}
@@ -88,7 +93,7 @@ public class BlobClientTest extends TestLogger {
/**
* Creates a test buffer and fills it with a specific byte pattern.
- *
+ *
* @return a test buffer filled with a specific byte pattern
*/
private static byte[] createTestBuffer() {
@@ -102,7 +107,7 @@ public class BlobClientTest extends TestLogger {
/**
* Prepares a test file for the unit tests, i.e. the methods fills the file with a particular byte patterns and
* computes the file's BLOB key.
- *
+ *
* @param file
* the file to prepare for the unit tests
* @return the BLOB key of the prepared file
@@ -139,62 +144,56 @@ public class BlobClientTest extends TestLogger {
/**
* Validates the result of a GET operation by comparing the data from the retrieved input stream to the content of
* the specified buffer.
- *
- * @param inputStream
+ *
+ * @param actualInputStream
* the input stream returned from the GET operation (will be closed by this method)
- * @param buf
+ * @param expectedBuf
* the buffer to compare the input stream's data to
* @throws IOException
* thrown if an I/O error occurs while reading the input stream
*/
- static void validateGetAndClose(final InputStream inputStream, final byte[] buf) throws IOException {
+ static void validateGetAndClose(final InputStream actualInputStream, final byte[] expectedBuf) throws IOException {
try {
- byte[] receivedBuffer = new byte[buf.length];
+ byte[] receivedBuffer = new byte[expectedBuf.length];
int bytesReceived = 0;
while (true) {
- final int read = inputStream
- .read(receivedBuffer, bytesReceived, receivedBuffer.length - bytesReceived);
+ final int read = actualInputStream.read(receivedBuffer, bytesReceived, receivedBuffer.length - bytesReceived);
if (read < 0) {
throw new EOFException();
}
bytesReceived += read;
if (bytesReceived == receivedBuffer.length) {
- assertEquals(-1, inputStream.read());
- assertArrayEquals(buf, receivedBuffer);
+ assertEquals(-1, actualInputStream.read());
+ assertArrayEquals(expectedBuf, receivedBuffer);
return;
}
}
} finally {
- inputStream.close();
+ actualInputStream.close();
}
}
/**
* Validates the result of a GET operation by comparing the data from the retrieved input stream to the content of
- * the specified file.
- *
- * @param inputStream
+ * the expected input stream.
+ *
+ * @param actualInputStream
* the input stream returned from the GET operation (will be closed by this method)
- * @param file
- * the file to compare the input stream's data to
+ * @param expectedInputStream
+ * the input stream to compare the input stream's data to
* @throws IOException
- * thrown if an I/O error occurs while reading the input stream or the file
+ * thrown if an I/O error occurs while reading any input stream
*/
- private static void validateGetAndClose(final InputStream inputStream, final File file) throws IOException {
-
- InputStream inputStream2 = null;
+ static void validateGetAndClose(InputStream actualInputStream, InputStream expectedInputStream)
+ throws IOException {
try {
-
- inputStream2 = new FileInputStream(file);
-
while (true) {
-
- final int r1 = inputStream.read();
- final int r2 = inputStream2.read();
+ final int r1 = actualInputStream.read();
+ final int r2 = expectedInputStream.read();
assertEquals(r2, r1);
@@ -202,22 +201,45 @@ public class BlobClientTest extends TestLogger {
break;
}
}
-
} finally {
- if (inputStream2 != null) {
- inputStream2.close();
- }
- inputStream.close();
+ actualInputStream.close();
+ expectedInputStream.close();
}
-
}
/**
- * Tests the PUT/GET operations for content-addressable buffers.
+ * Validates the result of a GET operation by comparing the data from the retrieved input stream to the content of
+ * the specified file.
+ *
+ * @param actualInputStream
+ * the input stream returned from the GET operation
+ * @param expectedFile
+ * the file to compare the input stream's data to
+ * @throws IOException
+ * thrown if an I/O error occurs while reading the input stream or the file
*/
+ @SuppressWarnings("WeakerAccess")
+ static void validateGetAndClose(final InputStream actualInputStream, final File expectedFile) throws IOException {
+ validateGetAndClose(actualInputStream, new FileInputStream(expectedFile));
+ }
+
@Test
- public void testContentAddressableBuffer() throws IOException {
+ public void testContentAddressableBufferTransientBlob() throws IOException {
+ testContentAddressableBuffer(false);
+ }
+ @Test
+ public void testContentAddressableBufferPermantBlob() throws IOException {
+ testContentAddressableBuffer(true);
+ }
+
+ /**
+ * Tests the PUT/GET operations for content-addressable buffers.
+ *
+ * @param permanentBlob
+ * whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
+ */
+ private void testContentAddressableBuffer(boolean permanentBlob) throws IOException {
BlobClient client = null;
try {
@@ -230,28 +252,37 @@ public class BlobClientTest extends TestLogger {
client = new BlobClient(serverAddress, getBlobClientConfig());
JobID jobId = new JobID();
+ BlobKey receivedKey;
+
+ // Store the data (job-unrelated)
+ if (!permanentBlob) {
+ receivedKey = client.putBuffer(null, testBuffer, 0, testBuffer.length, false);
+ assertEquals(origKey, receivedKey);
+ }
- // Store the data
- BlobKey receivedKey = client.put(null, testBuffer);
- assertEquals(origKey, receivedKey);
// try again with a job-related BLOB:
- receivedKey = client.put(jobId, testBuffer);
+ receivedKey = client.putBuffer(jobId, testBuffer, 0, testBuffer.length, permanentBlob);
assertEquals(origKey, receivedKey);
- // Retrieve the data
- validateGetAndClose(client.get(receivedKey), testBuffer);
- validateGetAndClose(client.get(jobId, receivedKey), testBuffer);
+ // Retrieve the data (job-unrelated)
+ if (!permanentBlob) {
+ validateGetAndClose(client.getInternal(null, receivedKey, false), testBuffer);
+ }
+ // job-related
+ validateGetAndClose(client.getInternal(jobId, receivedKey, permanentBlob), testBuffer);
- // Check reaction to invalid keys
- try (InputStream ignored = client.get(new BlobKey())) {
+ // Check reaction to invalid keys for job-unrelated blobs
+ try (InputStream ignored = client.getInternal(null, new BlobKey(), permanentBlob)) {
fail("Expected IOException did not occur");
}
catch (IOException fnfe) {
// expected
}
+
+ // Check reaction to invalid keys for job-related blobs
// new client needed (closed from failure above)
client = new BlobClient(serverAddress, getBlobClientConfig());
- try (InputStream ignored = client.get(jobId, new BlobKey())) {
+ try (InputStream ignored = client.getInternal(jobId, new BlobKey(), permanentBlob)) {
fail("Expected IOException did not occur");
}
catch (IOException fnfe) {
@@ -275,52 +306,122 @@ public class BlobClientTest extends TestLogger {
return BLOB_SERVER;
}
+ @Test
+ public void testContentAddressableStreamTransientBlob() throws IOException {
+ testContentAddressableStream(false);
+ }
+
+ @Test
+ public void testContentAddressableStreamPermanentBlob() throws IOException {
+ testContentAddressableStream(true);
+ }
+
/**
* Tests the PUT/GET operations for content-addressable streams.
+ *
+ * @param permanentBlob
+ * whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
*/
- @Test
- public void testContentAddressableStream() throws IOException {
+ private void testContentAddressableStream(boolean permanentBlob) throws IOException {
- BlobClient client = null;
- InputStream is = null;
+ File testFile = temporaryFolder.newFile();
+ BlobKey origKey = prepareTestFile(testFile);
- try {
- File testFile = File.createTempFile("testfile", ".dat");
- testFile.deleteOnExit();
+ InputStream is = null;
- BlobKey origKey = prepareTestFile(testFile);
-
- InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
- client = new BlobClient(serverAddress, getBlobClientConfig());
+ try (BlobClient client = new BlobClient(new InetSocketAddress("localhost", getBlobServer().getPort()), getBlobClientConfig())) {
JobID jobId = new JobID();
+ BlobKey receivedKey;
+
+ // Store the data (job-unrelated)
+ if (!permanentBlob) {
+ is = new FileInputStream(testFile);
+ receivedKey = client.putInputStream(null, is, false);
+ assertEquals(origKey, receivedKey);
+ }
- // Store the data
- is = new FileInputStream(testFile);
- BlobKey receivedKey = client.put(is);
- assertEquals(origKey, receivedKey);
// try again with a job-related BLOB:
is = new FileInputStream(testFile);
- receivedKey = client.put(jobId, is);
+ receivedKey = client.putInputStream(jobId, is, permanentBlob);
assertEquals(origKey, receivedKey);
is.close();
is = null;
- // Retrieve the data
- validateGetAndClose(client.get(receivedKey), testFile);
- validateGetAndClose(client.get(jobId, receivedKey), testFile);
- }
- finally {
+ // Retrieve the data (job-unrelated)
+ if (!permanentBlob) {
+ validateGetAndClose(client.getInternal(null, receivedKey, false), testFile);
+ }
+ // job-related
+ validateGetAndClose(client.getInternal(jobId, receivedKey, permanentBlob), testFile);
+ } finally {
if (is != null) {
try {
is.close();
} catch (Throwable ignored) {}
}
- if (client != null) {
- try {
- client.close();
- } catch (Throwable ignored) {}
+ }
+ }
+
+ @Test
+ public void testGetFailsDuringStreamingNoJobTransientBlob() throws IOException {
+ testGetFailsDuringStreaming(null, false);
+ }
+
+ @Test
+ public void testGetFailsDuringStreamingForJobTransientBlob() throws IOException {
+ testGetFailsDuringStreaming(new JobID(), false);
+ }
+
+ @Test
+ public void testGetFailsDuringStreamingForJobPermanentBlob() throws IOException {
+ testGetFailsDuringStreaming(new JobID(), true);
+ }
+
+ /**
+ * Checks the correct result if a GET operation fails during the file download.
+ *
+ * @param jobId
+ * job ID or <tt>null</tt> if job-unrelated
+ * @param permanentBlob
+ * whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
+ */
+ private void testGetFailsDuringStreaming(@Nullable final JobID jobId, boolean permanentBlob)
+ throws IOException {
+
+ try (BlobClient client = new BlobClient(
+ new InetSocketAddress("localhost", getBlobServer().getPort()), getBlobClientConfig())) {
+
+ byte[] data = new byte[5000000];
+ Random rnd = new Random();
+ rnd.nextBytes(data);
+
+ // put content addressable (like libraries)
+ BlobKey key = client.putBuffer(jobId, data, 0, data.length, permanentBlob);
+ assertNotNull(key);
+
+ // issue a GET request that succeeds
+ InputStream is = client.getInternal(jobId, key, permanentBlob);
+
+ byte[] receiveBuffer = new byte[data.length];
+ int firstChunkLen = 50000;
+ BlobUtils.readFully(is, receiveBuffer, 0, firstChunkLen, null);
+ BlobUtils.readFully(is, receiveBuffer, firstChunkLen, firstChunkLen, null);
+
+ // shut down the server
+ for (BlobServerConnection conn : getBlobServer().getCurrentActiveConnections()) {
+ conn.close();
+ }
+
+ try {
+ BlobUtils.readFully(is, receiveBuffer, 2 * firstChunkLen, data.length - 2 * firstChunkLen, null);
+ // we tolerate that this succeeds, as the receiver socket may have buffered
+ // everything already, but in this case, also verify the contents
+ assertArrayEquals(data, receiveBuffer);
+ }
+ catch (IOException e) {
+ // expected
}
}
}
@@ -357,7 +458,7 @@ public class BlobClientTest extends TestLogger {
assertEquals(1, blobKeys.size());
try (BlobClient blobClient = new BlobClient(serverAddress, blobClientConfig)) {
- validateGetAndClose(blobClient.get(jobId, blobKeys.get(0)), testFile);
+ validateGetAndClose(blobClient.getInternal(jobId, blobKeys.get(0), true), testFile);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/071e27f7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
index 43bc622..3797f87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
@@ -18,21 +18,18 @@
package org.apache.flink.runtime.blob;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.util.StringUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
/**
* This class contains unit tests for the {@link BlobKey} class.
*/
@@ -46,7 +43,8 @@ public final class BlobKeyTest extends TestLogger {
* The second key array to be used during the unit tests.
*/
private static final byte[] KEY_ARRAY_2 = new byte[20];
- /**
+
+ /*
* Initialize the key array.
*/
static {
@@ -57,7 +55,7 @@ public final class BlobKeyTest extends TestLogger {
}
/**
- * Tests the serialization/deserialization of BLOB keys
+ * Tests the serialization/deserialization of BLOB keys.
*/
@Test
public void testSerialization() throws Exception {
@@ -99,10 +97,10 @@ public final class BlobKeyTest extends TestLogger {
public void testStreams() throws Exception {
final BlobKey k1 = new BlobKey(KEY_ARRAY_1);
final ByteArrayOutputStream baos = new ByteArrayOutputStream(20);
-
+
k1.writeToOutputStream(baos);
baos.close();
-
+
final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
final BlobKey k2 = BlobKey.readFromInputStream(bais);
http://git-wip-us.apache.org/repos/asf/flink/blob/071e27f7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
deleted file mode 100644
index 81304f4..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.blob;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.util.TestLogger;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class BlobRecoveryITCase extends TestLogger {
-
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- /**
- * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any
- * participating BlobServer.
- */
- @Test
- public void testBlobServerRecovery() throws Exception {
- Configuration config = new Configuration();
- config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
- config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
- config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
- config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
-
- BlobStoreService blobStoreService = null;
-
- try {
- blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
-
- testBlobServerRecovery(config, blobStoreService);
- } finally {
- if (blobStoreService != null) {
- blobStoreService.closeAndCleanupAllData();
- }
- }
- }
-
- public static void testBlobServerRecovery(final Configuration config, final BlobStore blobStore) throws IOException {
- final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
- String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/" + clusterId;
- Random rand = new Random();
-
- BlobServer[] server = new BlobServer[2];
- InetSocketAddress[] serverAddress = new InetSocketAddress[2];
- BlobClient client = null;
-
- try {
- for (int i = 0; i < server.length; i++) {
- server[i] = new BlobServer(config, blobStore);
- serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort());
- }
-
- client = new BlobClient(serverAddress[0], config);
-
- // Random data
- byte[] expected = new byte[1024];
- rand.nextBytes(expected);
-
- BlobKey[] keys = new BlobKey[2];
-
- // Put job-unrelated data
- keys[0] = client.put(null, expected); // Request 1
- keys[1] = client.put(null, expected, 32, 256); // Request 2
-
- // Put job-related data, verify that the checksums match
- JobID[] jobId = new JobID[] { new JobID(), new JobID() };
- BlobKey key;
- key = client.put(jobId[0], expected); // Request 3
- assertEquals(keys[0], key);
- key = client.put(jobId[1], expected, 32, 256); // Request 4
- assertEquals(keys[1], key);
-
- // check that the storage directory exists
- final Path blobServerPath = new Path(storagePath, "blob");
- FileSystem fs = blobServerPath.getFileSystem();
- assertTrue("Unknown storage dir: " + blobServerPath, fs.exists(blobServerPath));
-
- // Close the client and connect to the other server
- client.close();
- client = new BlobClient(serverAddress[1], config);
-
- // Verify request 1
- try (InputStream is = client.get(keys[0])) {
- byte[] actual = new byte[expected.length];
-
- BlobUtils.readFully(is, actual, 0, expected.length, null);
-
- for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i], actual[i]);
- }
- }
-
- // Verify request 2
- try (InputStream is = client.get(keys[1])) {
- byte[] actual = new byte[256];
- BlobUtils.readFully(is, actual, 0, 256, null);
-
- for (int i = 32, j = 0; i < 256; i++, j++) {
- assertEquals(expected[i], actual[j]);
- }
- }
-
- // Verify request 3
- try (InputStream is = client.get(jobId[0], keys[0])) {
- byte[] actual = new byte[expected.length];
- BlobUtils.readFully(is, actual, 0, expected.length, null);
-
- for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i], actual[i]);
- }
- }
-
- // Verify request 4
- try (InputStream is = client.get(jobId[1], keys[1])) {
- byte[] actual = new byte[256];
- BlobUtils.readFully(is, actual, 0, 256, null);
-
- for (int i = 32, j = 0; i < 256; i++, j++) {
- assertEquals(expected[i], actual[j]);
- }
- }
-
- // Remove again
- client.delete(keys[0]);
- client.delete(keys[1]);
- client.delete(jobId[0], keys[0]);
- client.delete(jobId[1], keys[1]);
-
- // Verify everything is clean
- assertTrue("HA storage directory does not exist", fs.exists(new Path(storagePath)));
- if (fs.exists(blobServerPath)) {
- final org.apache.flink.core.fs.FileStatus[] recoveryFiles =
- fs.listStatus(blobServerPath);
- ArrayList<String> filenames = new ArrayList<String>(recoveryFiles.length);
- for (org.apache.flink.core.fs.FileStatus file: recoveryFiles) {
- filenames.add(file.toString());
- }
- fail("Unclean state backend: " + filenames);
- }
- }
- finally {
- for (BlobServer s : server) {
- if (s != null) {
- s.close();
- }
- }
-
- if (client != null) {
- client.close();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/071e27f7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
new file mode 100644
index 0000000..c4c6762
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
+import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests how GET requests react to corrupt files when downloaded via a {@link BlobServer}.
+ *
+ * <p>Successful GET requests are tested in conjunction wit the PUT requests.
+ */
+public class BlobServerCorruptionTest extends TestLogger {
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ /**
+ * Checks the GET operation fails when the downloaded file (from {@link BlobServer} or HA store)
+ * is corrupt, i.e. its content's hash does not match the {@link BlobKey}'s hash.
+ */
+ @Test
+ public void testGetFailsFromCorruptFile() throws IOException {
+
+ final Configuration config = new Configuration();
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+ config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
+
+ BlobStoreService blobStoreService = null;
+
+ try {
+ blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
+
+ testGetFailsFromCorruptFile(config, blobStoreService, exception);
+ } finally {
+ if (blobStoreService != null) {
+ blobStoreService.closeAndCleanupAllData();
+ }
+ }
+ }
+
+ /**
+ * Checks the GET operation fails when the downloaded file (from HA store)
+ * is corrupt, i.e. its content's hash does not match the {@link BlobKey}'s hash.
+ *
+ * @param config
+ * blob server configuration (including HA settings like {@link HighAvailabilityOptions#HA_STORAGE_PATH}
+ * and {@link HighAvailabilityOptions#HA_CLUSTER_ID}) used to set up <tt>blobStore</tt>
+ * @param blobStore
+ * shared HA blob store to use
+ * @param expectedException
+ * expected exception rule to use
+ */
+ public static void testGetFailsFromCorruptFile(
+ Configuration config, BlobStore blobStore, ExpectedException expectedException)
+ throws IOException {
+
+ Random rnd = new Random();
+ JobID jobId = new JobID();
+
+ try (BlobServer server = new BlobServer(config, blobStore)) {
+
+ server.start();
+
+ byte[] data = new byte[2000000];
+ rnd.nextBytes(data);
+
+ // put content addressable (like libraries)
+ BlobKey key = put(server, jobId, data, true);
+ assertNotNull(key);
+
+ // delete local file to make sure that the GET requests downloads from HA
+ File blobFile = server.getStorageLocation(jobId, key);
+ assertTrue(blobFile.delete());
+
+ // change HA store file contents to make sure that GET requests fail
+ byte[] data2 = Arrays.copyOf(data, data.length);
+ data2[0] ^= 1;
+ File tmpFile = Files.createTempFile("blob", ".jar").toFile();
+ try {
+ FileUtils.writeByteArrayToFile(tmpFile, data2);
+ blobStore.put(tmpFile, jobId, key);
+ } finally {
+ //noinspection ResultOfMethodCallIgnored
+ tmpFile.delete();
+ }
+
+ // issue a GET request that fails
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("data corruption");
+
+ get(server, jobId, key, true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/071e27f7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index aad8816..bb977d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -30,11 +30,12 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
@@ -44,16 +45,14 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob;
-import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist;
-import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
+import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
+import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
+import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
-import static org.mockito.Mockito.mock;
/**
* Tests how DELETE requests behave.
@@ -66,315 +65,279 @@ public class BlobServerDeleteTest extends TestLogger {
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Test
- public void testDeleteSingleByBlobKey() throws IOException {
- BlobServer server = null;
- BlobClient client = null;
- BlobStore blobStore = new VoidBlobStore();
+ public void testDeleteTransient1() throws IOException {
+ testDeleteTransient(null, new JobID());
+ }
+
+ @Test
+ public void testDeleteTransient2() throws IOException {
+ testDeleteTransient(new JobID(), null);
+ }
+
+ @Test
+ public void testDeleteTransient3() throws IOException {
+ testDeleteTransient(null, null);
+ }
+
+ @Test
+ public void testDeleteTransient4() throws IOException {
+ testDeleteTransient(new JobID(), new JobID());
+ }
+
+ /**
+ * Uploads a (different) byte array for each of the given jobs and verifies that deleting one of
+ * them (via the {@link BlobServer}) does not influence the other.
+ *
+ * @param jobId1
+ * first job id
+ * @param jobId2
+ * second job id
+ */
+ private void testDeleteTransient(@Nullable JobID jobId1, @Nullable JobID jobId2)
+ throws IOException {
- try {
- final Configuration config = new Configuration();
- config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+ final Configuration config = new Configuration();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
- server = new BlobServer(config, blobStore);
+ try (BlobServer server = new BlobServer(config, new VoidBlobStore())) {
- InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
- client = new BlobClient(serverAddress, config);
+ server.start();
byte[] data = new byte[2000000];
rnd.nextBytes(data);
+ byte[] data2 = Arrays.copyOf(data, data.length);
+ data2[0] ^= 1;
- // put job-unrelated (like libraries)
- BlobKey key1 = client.put(null, data);
+ // put first BLOB
+ BlobKey key1 = put(server, jobId1, data, false);
assertNotNull(key1);
- // second job-unrelated item
- data[0] ^= 1;
- BlobKey key2 = client.put(null, data);
- assertNotNull(key2);
- assertNotEquals(key1, key2);
-
- // put job-related with same key1 as non-job-related
- data[0] ^= 1; // back to the original data
- final JobID jobId = new JobID();
- BlobKey key1b = client.put(jobId, data);
- assertNotNull(key1b);
- assertEquals(key1, key1b);
-
- // issue a DELETE request via the client
- client.delete(key1);
- client.close();
-
- client = new BlobClient(serverAddress, config);
- try (InputStream ignored = client.get(key1)) {
- fail("BLOB should have been deleted");
- }
- catch (IOException e) {
- // expected
- }
+ // put two more BLOBs (same key, other key) for another job ID
+ BlobKey key2a = put(server, jobId2, data, false);
+ assertNotNull(key2a);
+ assertEquals(key1, key2a);
+ BlobKey key2b = put(server, jobId2, data2, false);
+ assertNotNull(key2b);
- ensureClientIsClosed(client);
+ // issue a DELETE request
+ assertTrue(delete(server, jobId1, key1));
- client = new BlobClient(serverAddress, config);
- try {
- // NOTE: the server will stall in its send operation until either the data is fully
- // read or the socket is closed, e.g. via a client.close() call
- validateGetAndClose(client.get(jobId, key1), data);
+ verifyDeleted(server, jobId1, key1, false);
+ // deleting a one BLOB should not affect another BLOB, even with the same key if job IDs are different
+ if ((jobId1 == null && jobId2 != null) || (jobId1 != null && !jobId1.equals(jobId2))) {
+ verifyContents(server, jobId2, key2a, data, false);
}
- catch (IOException e) {
- fail("Deleting a job-unrelated BLOB should not affect a job-related BLOB with the same key");
- }
- client.close();
+ verifyContents(server, jobId2, key2b, data2, false);
- // delete a file directly on the server
- server.delete(key2);
- try {
- server.getFile(key2);
- fail("BLOB should have been deleted");
- }
- catch (IOException e) {
- // expected
- }
- }
- finally {
- cleanup(server, client);
- }
- }
+ // delete first file of second job
+ assertTrue(delete(server, jobId2, key2a));
+ verifyDeleted(server, jobId2, key2a, false);
+ verifyContents(server, jobId2, key2b, data2, false);
- private static void ensureClientIsClosed(final BlobClient client) throws IOException {
- try {
- client.put(null, new byte[1]);
- fail("client should be closed after erroneous operation");
- }
- catch (IllegalStateException e) {
- // expected
- } finally {
- client.close();
+ // delete second file of second job
+ assertTrue(delete(server, jobId2, key2a));
+ verifyDeleted(server, jobId2, key2a, false);
+ verifyContents(server, jobId2, key2b, data2, false);
}
}
@Test
- public void testDeleteAlreadyDeletedNoJob() throws IOException {
- testDeleteAlreadyDeleted(null);
+ public void testDeleteTransientAlreadyDeletedNoJob() throws IOException {
+ testDeleteTransientAlreadyDeleted(null);
}
@Test
- public void testDeleteAlreadyDeletedForJob() throws IOException {
- testDeleteAlreadyDeleted(new JobID());
+ public void testDeleteTransientAlreadyDeletedForJob() throws IOException {
+ testDeleteTransientAlreadyDeleted(new JobID());
}
- private void testDeleteAlreadyDeleted(final JobID jobId) throws IOException {
- BlobServer server = null;
- BlobClient client = null;
- BlobStore blobStore = new VoidBlobStore();
+ /**
+ * Uploads a byte array for the given job and verifies that deleting it (via the {@link
+ * BlobServer}) does not fail independent of whether the file exists.
+ *
+ * @param jobId
+ * job id
+ */
+ private void testDeleteTransientAlreadyDeleted(@Nullable final JobID jobId) throws IOException {
- try {
- final Configuration config = new Configuration();
- config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+ final Configuration config = new Configuration();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
- server = new BlobServer(config, blobStore);
+ try (BlobServer server = new BlobServer(config, new VoidBlobStore())) {
- InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
- client = new BlobClient(serverAddress, config);
+ server.start();
byte[] data = new byte[2000000];
rnd.nextBytes(data);
- // put file
- BlobKey key = client.put(jobId, data);
+ // put BLOB
+ BlobKey key = put(server, jobId, data, false);
assertNotNull(key);
File blobFile = server.getStorageLocation(jobId, key);
assertTrue(blobFile.delete());
- // issue a DELETE request via the client
- try {
- deleteHelper(client, jobId, key);
- }
- catch (IOException e) {
- fail("DELETE operation should not fail if file is already deleted");
- }
-
- // issue a DELETE request on the server
- if (jobId == null) {
- server.delete(key);
- } else {
- server.delete(jobId, key);
- }
- }
- finally {
- cleanup(server, client);
- }
- }
+ // DELETE operation should not fail if file is already deleted
+ assertTrue(delete(server, jobId, key));
+ verifyDeleted(server, jobId, key, false);
- private static void deleteHelper(BlobClient client, JobID jobId, BlobKey key) throws IOException {
- if (jobId == null) {
- client.delete(key);
- } else {
- client.delete(jobId, key);
+ // one more delete call that should not fail
+ assertTrue(delete(server, jobId, key));
+ verifyDeleted(server, jobId, key, false);
}
}
@Test
- public void testDeleteFailsNoJob() throws IOException {
- testDeleteFails(null);
+ public void testDeleteTransientFailsNoJob() throws IOException {
+ testDeleteTransientFails(null);
}
@Test
- public void testDeleteFailsForJob() throws IOException {
- testDeleteFails(new JobID());
+ public void testDeleteTransientFailsForJob() throws IOException {
+ testDeleteTransientFails(new JobID());
}
- private void testDeleteFails(final JobID jobId) throws IOException {
+ /**
+ * Uploads a byte array for the given job and verifies that a delete operation (via the {@link
+ * BlobServer}) does not fail even if the file is not deletable, e.g. via restricting the
+ * permissions.
+ *
+ * @param jobId
+ * job id
+ */
+ private void testDeleteTransientFails(@Nullable final JobID jobId) throws IOException {
assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
- BlobServer server = null;
- BlobClient client = null;
- BlobStore blobStore = new VoidBlobStore();
+ final Configuration config = new Configuration();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
File blobFile = null;
File directory = null;
- try {
- final Configuration config = new Configuration();
- config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
- server = new BlobServer(config, blobStore);
+ try (BlobServer server = new BlobServer(config, new VoidBlobStore())) {
- InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
- client = new BlobClient(serverAddress, config);
-
- byte[] data = new byte[2000000];
- rnd.nextBytes(data);
+ server.start();
- // put content addressable (like libraries)
- BlobKey key = client.put(jobId, data);
- assertNotNull(key);
-
- blobFile = server.getStorageLocation(jobId, key);
- directory = blobFile.getParentFile();
-
- assertTrue(blobFile.setWritable(false, false));
- assertTrue(directory.setWritable(false, false));
-
- // issue a DELETE request via the client
- deleteHelper(client, jobId, key);
-
- // issue a DELETE request on the server
- if (jobId == null) {
- server.delete(key);
- } else {
- server.delete(jobId, key);
- }
-
- // the file should still be there
- if (jobId == null) {
- server.getFile(key);
- } else {
- server.getFile(jobId, key);
- }
- } finally {
- if (blobFile != null && directory != null) {
- //noinspection ResultOfMethodCallIgnored
- blobFile.setWritable(true, false);
- //noinspection ResultOfMethodCallIgnored
- directory.setWritable(true, false);
+ try {
+ byte[] data = new byte[2000000];
+ rnd.nextBytes(data);
+
+ // put BLOB
+ BlobKey key = put(server, jobId, data, false);
+ assertNotNull(key);
+
+ blobFile = server.getStorageLocation(jobId, key);
+ directory = blobFile.getParentFile();
+
+ assertTrue(blobFile.setWritable(false, false));
+ assertTrue(directory.setWritable(false, false));
+
+ // issue a DELETE request
+ assertFalse(delete(server, jobId, key));
+
+ // the file should still be there
+ verifyContents(server, jobId, key, data, false);
+ } finally {
+ if (blobFile != null && directory != null) {
+ //noinspection ResultOfMethodCallIgnored
+ blobFile.setWritable(true, false);
+ //noinspection ResultOfMethodCallIgnored
+ directory.setWritable(true, false);
+ }
}
- cleanup(server, client);
}
}
- /**
- * Tests that {@link BlobServer} cleans up after calling {@link BlobServer#cleanupJob(JobID)}.
- */
@Test
public void testJobCleanup() throws IOException, InterruptedException {
+ testJobCleanup(false);
+ }
+
+ @Test
+ public void testJobCleanupHa() throws IOException, InterruptedException {
+ testJobCleanup(true);
+ }
+ /**
+ * Tests that {@link BlobServer} cleans up after calling {@link BlobServer#cleanupJob(JobID)}.
+ *
+ * @param highAvailability
+ * whether to use permanent (<tt>true</tt>) or transient BLOBs (<tt>false</tt>)
+ */
+ private void testJobCleanup(boolean highAvailability) throws IOException {
JobID jobId1 = new JobID();
- List<BlobKey> keys1 = new ArrayList<BlobKey>();
JobID jobId2 = new JobID();
- List<BlobKey> keys2 = new ArrayList<BlobKey>();
- BlobServer server = null;
- final byte[] buf = new byte[128];
+ Configuration config = new Configuration();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+ temporaryFolder.newFolder().getAbsolutePath());
- try {
- Configuration config = new Configuration();
- config.setString(BlobServerOptions.STORAGE_DIRECTORY,
- temporaryFolder.newFolder().getAbsolutePath());
+ try (BlobServer server = new BlobServer(config, new VoidBlobStore())) {
- server = new BlobServer(config, new VoidBlobStore());
- InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
- BlobClient bc = new BlobClient(serverAddress, config);
+ server.start();
- keys1.add(bc.put(jobId1, buf));
- keys2.add(bc.put(jobId2, buf));
- assertEquals(keys2.get(0), keys1.get(0));
+ final byte[] data = new byte[128];
+ byte[] data2 = Arrays.copyOf(data, data.length);
+ data2[0] ^= 1;
- buf[0] += 1;
- keys1.add(bc.put(jobId1, buf));
+ BlobKey key1a = put(server, jobId1, data, highAvailability);
+ BlobKey key2 = put(server, jobId2, data, highAvailability);
+ assertEquals(key1a, key2);
- bc.close();
+ BlobKey key1b = put(server, jobId1, data2, highAvailability);
- assertEquals(2, checkFilesExist(jobId1, keys1, server, true));
+ verifyContents(server, jobId1, key1a, data, highAvailability);
+ verifyContents(server, jobId1, key1b, data2, highAvailability);
checkFileCountForJob(2, jobId1, server);
- assertEquals(1, checkFilesExist(jobId2, keys2, server, true));
+
+ verifyContents(server, jobId2, key2, data, highAvailability);
checkFileCountForJob(1, jobId2, server);
server.cleanupJob(jobId1);
+ verifyDeleted(server, jobId1, key1a, highAvailability);
+ verifyDeleted(server, jobId1, key1b, highAvailability);
checkFileCountForJob(0, jobId1, server);
- assertEquals(1, checkFilesExist(jobId2, keys2, server, true));
+ verifyContents(server, jobId2, key2, data, highAvailability);
checkFileCountForJob(1, jobId2, server);
server.cleanupJob(jobId2);
checkFileCountForJob(0, jobId1, server);
+ verifyDeleted(server, jobId2, key2, highAvailability);
checkFileCountForJob(0, jobId2, server);
// calling a second time should not fail
server.cleanupJob(jobId2);
}
- finally {
- if (server != null) {
- server.close();
- }
- }
}
- /**
- * FLINK-6020
- *
- * Tests that concurrent delete operations don't interfere with each other.
- *
- * Note: The test checks that there cannot be two threads which have checked whether a given blob file exist
- * and then one of them fails deleting it. Without the introduced lock, this situation should rarely happen
- * and make this test fail. Thus, if this test should become "unstable", then the delete atomicity is most likely
- * broken.
- */
@Test
public void testConcurrentDeleteOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
testConcurrentDeleteOperations(null);
}
- /**
- * FLINK-6020
- *
- * Tests that concurrent delete operations don't interfere with each other.
- *
- * Note: The test checks that there cannot be two threads which have checked whether a given blob file exist
- * and then one of them fails deleting it. Without the introduced lock, this situation should rarely happen
- * and make this test fail. Thus, if this test should become "unstable", then the delete atomicity is most likely
- * broken.
- */
@Test
public void testConcurrentDeleteOperationsForJob() throws IOException, ExecutionException, InterruptedException {
testConcurrentDeleteOperations(new JobID());
}
- private void testConcurrentDeleteOperations(final JobID jobId)
- throws IOException, InterruptedException, ExecutionException {
+ /**
+ * [FLINK-6020] Tests that concurrent delete operations don't interfere with each other.
+ *
+ * <p>Note: This test checks that there cannot be two threads which have checked whether a given
+ * blob file exist and then one of them fails deleting it. Without the introduced lock, this
+ * situation should rarely happen and make this test fail. Thus, if this test should become
+ * "unstable", then the delete atomicity is most likely broken.
+ *
+ * @param jobId
+ * job ID to use (or <tt>null</tt> if job-unrelated)
+ */
+ private void testConcurrentDeleteOperations(@Nullable final JobID jobId)
+ throws IOException, InterruptedException, ExecutionException {
+
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
- final BlobStore blobStore = mock(BlobStore.class);
final int concurrentDeleteOperations = 3;
final ExecutorService executor = Executors.newFixedThreadPool(concurrentDeleteOperations);
@@ -383,28 +346,27 @@ public class BlobServerDeleteTest extends TestLogger {
final byte[] data = {1, 2, 3};
- try (final BlobServer blobServer = new BlobServer(config, blobStore)) {
+ try (final BlobServer server = new BlobServer(config, new VoidBlobStore())) {
- final BlobKey blobKey;
+ server.start();
- try (BlobClient client = blobServer.createClient()) {
- blobKey = client.put(jobId, data);
- }
+ final BlobKey blobKey = put(server, jobId, data, false);
- assertTrue(blobServer.getStorageLocation(jobId, blobKey).exists());
+ assertTrue(server.getStorageLocation(jobId, blobKey).exists());
for (int i = 0; i < concurrentDeleteOperations; i++) {
CompletableFuture<Void> deleteFuture = CompletableFuture.supplyAsync(
() -> {
- try (BlobClient blobClient = blobServer.createClient()) {
- deleteHelper(blobClient, jobId, blobKey);
+ try {
+ assertTrue(delete(server, jobId, blobKey));
+ assertFalse(server.getStorageLocation(jobId, blobKey).exists());
+ return null;
} catch (IOException e) {
- throw new CompletionException(new FlinkException("Could not delete the given blob key " + blobKey + '.', e));
+ throw new CompletionException(new FlinkException(
+ "Could not delete the given blob key " + blobKey + '.'));
}
-
- return null;
},
- executor);
+ executor);
deleteFutures.add(deleteFuture);
}
@@ -415,26 +377,30 @@ public class BlobServerDeleteTest extends TestLogger {
// in case of no lock, one of the delete operations should eventually fail
waitFuture.get();
- assertFalse(blobServer.getStorageLocation(jobId, blobKey).exists());
+ assertFalse(server.getStorageLocation(jobId, blobKey).exists());
+
} finally {
executor.shutdownNow();
}
}
- private static void cleanup(BlobServer server, BlobClient client) {
- if (client != null) {
- try {
- client.close();
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
- if (server != null) {
- try {
- server.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
+ /**
+ * Deletes a transient BLOB from the given BLOB service.
+ *
+ * @param service
+ * blob service
+ * @param jobId
+ * job ID (or <tt>null</tt> if job-unrelated)
+ * @param key
+ * blob key
+ *
+ * @return delete success
+ */
+ static boolean delete(BlobService service, @Nullable JobID jobId, BlobKey key) {
+ if (jobId == null) {
+ return service.getTransientBlobStore().delete(key);
+ } else {
+ return service.getTransientBlobStore().delete(jobId, key);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/071e27f7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index d3d3484..0873aba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -21,27 +21,33 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+import javax.annotation.Nullable;
+
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.NoSuchFileException;
import java.security.MessageDigest;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
@@ -51,22 +57,23 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
+import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
+import static org.apache.flink.runtime.blob.BlobUtils.JOB_DIR_PREFIX;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
/**
- * Tests how failing GET requests behave in the presence of failures.
- * Successful GET requests are tested in conjunction wit the PUT
- * requests.
+ * Tests how failing GET requests behave in the presence of failures when used with a {@link
+ * BlobServer}.
+ *
+ * <p>Successful GET requests are tested in conjunction wit the PUT requests.
*/
public class BlobServerGetTest extends TestLogger {
@@ -75,19 +82,27 @@ public class BlobServerGetTest extends TestLogger {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
@Test
public void testGetFailsDuringLookup1() throws IOException {
- testGetFailsDuringLookup(null, new JobID());
+ testGetFailsDuringLookup(null, new JobID(), false);
}
@Test
public void testGetFailsDuringLookup2() throws IOException {
- testGetFailsDuringLookup(new JobID(), new JobID());
+ testGetFailsDuringLookup(new JobID(), new JobID(), false);
}
@Test
public void testGetFailsDuringLookup3() throws IOException {
- testGetFailsDuringLookup(new JobID(), null);
+ testGetFailsDuringLookup(new JobID(), null, false);
+ }
+
+ @Test
+ public void testGetFailsDuringLookupHa() throws IOException {
+ testGetFailsDuringLookup(new JobID(), new JobID(), true);
}
/**
@@ -96,25 +111,21 @@ public class BlobServerGetTest extends TestLogger {
* @param jobId1 first job ID or <tt>null</tt> if job-unrelated
* @param jobId2 second job ID different to <tt>jobId1</tt>
*/
- private void testGetFailsDuringLookup(final JobID jobId1, final JobID jobId2)
- throws IOException {
- BlobServer server = null;
- BlobClient client = null;
-
- try {
- final Configuration config = new Configuration();
- config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+ private void testGetFailsDuringLookup(
+ @Nullable final JobID jobId1, @Nullable final JobID jobId2, boolean highAvailability)
+ throws IOException {
+ final Configuration config = new Configuration();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
- server = new BlobServer(config, new VoidBlobStore());
+ try (BlobServer server = new BlobServer(config, new VoidBlobStore())) {
- InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
- client = new BlobClient(serverAddress, config);
+ server.start();
byte[] data = new byte[2000000];
rnd.nextBytes(data);
// put content addressable (like libraries)
- BlobKey key = client.put(jobId1, data);
+ BlobKey key = put(server, jobId1, data, highAvailability);
assertNotNull(key);
// delete file to make sure that GET requests fail
@@ -122,148 +133,233 @@ public class BlobServerGetTest extends TestLogger {
assertTrue(blobFile.delete());
// issue a GET request that fails
- client = verifyDeleted(client, jobId1, key, serverAddress, config);
+ verifyDeleted(server, jobId1, key, highAvailability);
- BlobKey key2 = client.put(jobId2, data);
+ // add the same data under a second jobId
+ BlobKey key2 = put(server, jobId2, data, highAvailability);
assertNotNull(key);
assertEquals(key, key2);
+
// request for jobId2 should succeed
- validateGetAndClose(getFileHelper(client, jobId2, key), data);
+ get(server, jobId2, key, highAvailability);
// request for jobId1 should still fail
- client = verifyDeleted(client, jobId1, key, serverAddress, config);
+ verifyDeleted(server, jobId1, key, highAvailability);
// same checks as for jobId1 but for jobId2 should also work:
blobFile = server.getStorageLocation(jobId2, key);
assertTrue(blobFile.delete());
- client = verifyDeleted(client, jobId2, key, serverAddress, config);
-
- } finally {
- if (client != null) {
- client.close();
- }
- if (server != null) {
- server.close();
- }
+ verifyDeleted(server, jobId2, key, highAvailability);
}
}
/**
- * Checks that the given blob does not exist anymore.
- *
- * @param client
- * BLOB client to use for connecting to the BLOB server
- * @param jobId
- * job ID or <tt>null</tt> if job-unrelated
- * @param key
- * key identifying the BLOB to request
- * @param serverAddress
- * BLOB server address
- * @param config
- * client config
- *
- * @return a new client (since the old one is being closed on failure)
+ * Retrieves a BLOB from the HA store to a {@link BlobServer} which cannot create incoming
+ * files. File transfers should fail.
*/
- private static BlobClient verifyDeleted(
- BlobClient client, JobID jobId, BlobKey key,
- InetSocketAddress serverAddress, Configuration config) throws IOException {
- try (InputStream ignored = getFileHelper(client, jobId, key)) {
- fail("This should not succeed.");
- } catch (IOException e) {
- // expected
- }
- // need a new client (old ony closed due to failure
- return new BlobClient(serverAddress, config);
- }
-
@Test
- public void testGetFailsDuringStreamingNoJob() throws IOException {
- testGetFailsDuringStreaming(null);
- }
+ public void testGetFailsIncomingForJobHa() throws IOException {
+ assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
- @Test
- public void testGetFailsDuringStreamingForJob() throws IOException {
- testGetFailsDuringStreaming(new JobID());
+ final JobID jobId = new JobID();
+
+ final Configuration config = new Configuration();
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+ config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
+
+ BlobStoreService blobStore = null;
+
+ try {
+ blobStore = BlobUtils.createBlobStoreFromConfig(config);
+
+ File tempFileDir = null;
+ try (BlobServer server = new BlobServer(config, blobStore)) {
+
+ server.start();
+
+ // store the data on the server (and blobStore), remove from local store
+ byte[] data = new byte[2000000];
+ rnd.nextBytes(data);
+ BlobKey blobKey = put(server, jobId, data, true);
+ assertTrue(server.getStorageLocation(jobId, blobKey).delete());
+
+ // make sure the blob server cannot create any files in its storage dir
+ tempFileDir = server.createTemporaryFilename().getParentFile();
+ assertTrue(tempFileDir.setExecutable(true, false));
+ assertTrue(tempFileDir.setReadable(true, false));
+ assertTrue(tempFileDir.setWritable(false, false));
+
+ // request the file from the BlobStore
+ exception.expect(IOException.class);
+ exception.expectMessage("Permission denied");
+
+ try {
+ get(server, jobId, blobKey, true);
+ } finally {
+ HashSet<String> expectedDirs = new HashSet<>();
+ expectedDirs.add("incoming");
+ expectedDirs.add(JOB_DIR_PREFIX + jobId);
+ // only the incoming and job directory should exist (no job directory!)
+ File storageDir = tempFileDir.getParentFile();
+ String[] actualDirs = storageDir.list();
+ assertNotNull(actualDirs);
+ assertEquals(expectedDirs, new HashSet<>(Arrays.asList(actualDirs)));
+
+ // job directory should be empty
+ File jobDir = new File(tempFileDir.getParentFile(), JOB_DIR_PREFIX + jobId);
+ assertArrayEquals(new String[] {}, jobDir.list());
+ }
+ } finally {
+ // set writable again to make sure we can remove the directory
+ if (tempFileDir != null) {
+ //noinspection ResultOfMethodCallIgnored
+ tempFileDir.setWritable(true, false);
+ }
+ }
+ } finally {
+ if (blobStore != null) {
+ blobStore.closeAndCleanupAllData();
+ }
+ }
}
/**
- * Checks the correct result if a GET operation fails during the file download.
- *
- * @param jobId job ID or <tt>null</tt> if job-unrelated
+ * Retrieves a BLOB from the HA store to a {@link BlobServer} which cannot create the final
+ * storage file. File transfers should fail.
*/
- private void testGetFailsDuringStreaming(final JobID jobId) throws IOException {
- BlobServer server = null;
- BlobClient client = null;
+ @Test
+ public void testGetFailsStoreForJobHa() throws IOException {
+ assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
- try {
- final Configuration config = new Configuration();
- config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+ final JobID jobId = new JobID();
+
+ final Configuration config = new Configuration();
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+ config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
- server = new BlobServer(config, new VoidBlobStore());
+ BlobStoreService blobStore = null;
- InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
- client = new BlobClient(serverAddress, config);
+ try {
+ blobStore = BlobUtils.createBlobStoreFromConfig(config);
- byte[] data = new byte[5000000];
- rnd.nextBytes(data);
+ File jobStoreDir = null;
+ try (BlobServer server = new BlobServer(config, blobStore)) {
- // put content addressable (like libraries)
- BlobKey key = client.put(jobId, data);
- assertNotNull(key);
+ server.start();
- // issue a GET request that succeeds
- InputStream is = getFileHelper(client, jobId, key);
+ // store the data on the server (and blobStore), remove from local store
+ byte[] data = new byte[2000000];
+ rnd.nextBytes(data);
+ BlobKey blobKey = put(server, jobId, data, true);
+ assertTrue(server.getStorageLocation(jobId, blobKey).delete());
- byte[] receiveBuffer = new byte[data.length];
- int firstChunkLen = 50000;
- BlobUtils.readFully(is, receiveBuffer, 0, firstChunkLen, null);
- BlobUtils.readFully(is, receiveBuffer, firstChunkLen, firstChunkLen, null);
+ // make sure the blob cache cannot create any files in its storage dir
+ jobStoreDir = server.getStorageLocation(jobId, blobKey).getParentFile();
+ assertTrue(jobStoreDir.setExecutable(true, false));
+ assertTrue(jobStoreDir.setReadable(true, false));
+ assertTrue(jobStoreDir.setWritable(false, false));
- // shut down the server
- for (BlobServerConnection conn : server.getCurrentActiveConnections()) {
- conn.close();
- }
+ // request the file from the BlobStore
+ exception.expect(AccessDeniedException.class);
- try {
- BlobUtils.readFully(is, receiveBuffer, 2 * firstChunkLen, data.length - 2 * firstChunkLen, null);
- // we tolerate that this succeeds, as the receiver socket may have buffered
- // everything already, but in this case, also verify the contents
- assertArrayEquals(data, receiveBuffer);
- }
- catch (IOException e) {
- // expected
+ try {
+ get(server, jobId, blobKey, true);
+ } finally {
+ // there should be no remaining incoming files
+ File incomingFileDir = new File(jobStoreDir.getParent(), "incoming");
+ assertArrayEquals(new String[] {}, incomingFileDir.list());
+
+ // there should be no files in the job directory
+ assertArrayEquals(new String[] {}, jobStoreDir.list());
+ }
+ } finally {
+ // set writable again to make sure we can remove the directory
+ if (jobStoreDir != null) {
+ //noinspection ResultOfMethodCallIgnored
+ jobStoreDir.setWritable(true, false);
+ }
}
- is.close();
} finally {
- if (client != null) {
- client.close();
- }
- if (server != null) {
- server.close();
+ if (blobStore != null) {
+ blobStore.closeAndCleanupAllData();
}
}
}
/**
- * FLINK-6020
- *
- * Tests that concurrent get operations don't concurrently access the BlobStore to download a blob.
+ * Retrieves a BLOB from the HA store to a {@link BlobServer} whose HA store does not contain
+ * the file. File transfers should fail.
*/
@Test
+ public void testGetFailsHaStoreForJobHa() throws IOException {
+ final JobID jobId = new JobID();
+
+ final Configuration config = new Configuration();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+
+ try (BlobServer server = new BlobServer(config, new VoidBlobStore())) {
+
+ server.start();
+
+ // store the data on the server (and blobStore), remove from local store
+ byte[] data = new byte[2000000];
+ rnd.nextBytes(data);
+ BlobKey blobKey = put(server, jobId, data, true);
+ assertTrue(server.getStorageLocation(jobId, blobKey).delete());
+
+ File tempFileDir = server.createTemporaryFilename().getParentFile();
+
+ // request the file from the BlobStore
+ exception.expect(NoSuchFileException.class);
+
+ try {
+ get(server, jobId, blobKey, true);
+ } finally {
+ HashSet<String> expectedDirs = new HashSet<>();
+ expectedDirs.add("incoming");
+ expectedDirs.add(JOB_DIR_PREFIX + jobId);
+ // only the incoming and job directory should exist (no job directory!)
+ File storageDir = tempFileDir.getParentFile();
+ String[] actualDirs = storageDir.list();
+ assertNotNull(actualDirs);
+ assertEquals(expectedDirs, new HashSet<>(Arrays.asList(actualDirs)));
+
+ // job directory should be empty
+ File jobDir = new File(tempFileDir.getParentFile(), JOB_DIR_PREFIX + jobId);
+ assertArrayEquals(new String[] {}, jobDir.list());
+ }
+ }
+ }
+
+ @Test
public void testConcurrentGetOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
- testConcurrentGetOperations(null);
+ testConcurrentGetOperations(null, false);
}
- /**
- * FLINK-6020
- *
- * Tests that concurrent get operations don't concurrently access the BlobStore to download a blob.
- */
@Test
public void testConcurrentGetOperationsForJob() throws IOException, ExecutionException, InterruptedException {
- testConcurrentGetOperations(new JobID());
+ testConcurrentGetOperations(new JobID(), false);
+ }
+
+ @Test
+ public void testConcurrentGetOperationsForJobHa() throws IOException, ExecutionException, InterruptedException {
+ testConcurrentGetOperations(new JobID(), true);
}
- private void testConcurrentGetOperations(final JobID jobId)
+ /**
+ * [FLINK-6020] Tests that concurrent get operations don't concurrently access the BlobStore to
+ * download a blob.
+ *
+ * @param jobId
+ * job ID to use (or <tt>null</tt> if job-unrelated)
+ * @param highAvailability
+ * whether to use permanent (<tt>true</tt>) or transient BLOBs (<tt>false</tt>)
+ */
+ private void testConcurrentGetOperations(
+ @Nullable final JobID jobId, final boolean highAvailability)
throws IOException, InterruptedException, ExecutionException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -271,10 +367,9 @@ public class BlobServerGetTest extends TestLogger {
final BlobStore blobStore = mock(BlobStore.class);
final int numberConcurrentGetOperations = 3;
- final List<CompletableFuture<InputStream>> getOperations = new ArrayList<>(numberConcurrentGetOperations);
+ final List<CompletableFuture<File>> getOperations = new ArrayList<>(numberConcurrentGetOperations);
final byte[] data = {1, 2, 3, 4, 99, 42};
- final ByteArrayInputStream bais = new ByteArrayInputStream(data);
MessageDigest md = BlobUtils.createMessageDigest();
@@ -287,7 +382,7 @@ public class BlobServerGetTest extends TestLogger {
public Object answer(InvocationOnMock invocation) throws Throwable {
File targetFile = (File) invocation.getArguments()[2];
- FileUtils.copyInputStreamToFile(bais, targetFile);
+ FileUtils.writeByteArrayToFile(targetFile, data);
return null;
}
@@ -296,19 +391,29 @@ public class BlobServerGetTest extends TestLogger {
final ExecutorService executor = Executors.newFixedThreadPool(numberConcurrentGetOperations);
- try (final BlobServer blobServer = new BlobServer(config, blobStore)) {
- for (int i = 0; i < numberConcurrentGetOperations; i++) {
- CompletableFuture<InputStream> getOperation = CompletableFuture.supplyAsync(
- () -> {
- try (BlobClient blobClient = blobServer.createClient();
- InputStream inputStream = getFileHelper(blobClient, jobId, blobKey)) {
- byte[] buffer = new byte[data.length];
+ try (final BlobServer server = new BlobServer(config, blobStore)) {
+
+ server.start();
- IOUtils.readFully(inputStream, buffer);
+ // upload data first
+ assertEquals(blobKey, put(server, jobId, data, highAvailability));
- return new ByteArrayInputStream(buffer);
+ // now try accessing it concurrently (only HA mode will be able to retrieve it from HA store!)
+ if (highAvailability) {
+ // remove local copy so that a transfer from HA store takes place
+ assertTrue(server.getStorageLocation(jobId, blobKey).delete());
+ }
+ for (int i = 0; i < numberConcurrentGetOperations; i++) {
+ CompletableFuture<File> getOperation = CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ File file = get(server, jobId, blobKey, highAvailability);
+ // check that we have read the right data
+ validateGetAndClose(new FileInputStream(file), data);
+ return file;
} catch (IOException e) {
- throw new CompletionException(new FlinkException("Could not read blob for key " + blobKey + '.', e));
+ throw new CompletionException(new FlinkException(
+ "Could not read blob for key " + blobKey + '.', e));
}
},
executor);
@@ -316,37 +421,63 @@ public class BlobServerGetTest extends TestLogger {
getOperations.add(getOperation);
}
- CompletableFuture<Collection<InputStream>> inputStreamsFuture = FutureUtils.combineAll(getOperations);
-
- Collection<InputStream> inputStreams = inputStreamsFuture.get();
-
- // check that we have read the right data
- for (InputStream inputStream : inputStreams) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(data.length);
-
- IOUtils.copy(inputStream, baos);
-
- baos.close();
- byte[] input = baos.toByteArray();
-
- assertArrayEquals(data, input);
-
- inputStream.close();
- }
-
- // verify that we downloaded the requested blob exactly once from the BlobStore
- verify(blobStore, times(1)).get(eq(jobId), eq(blobKey), any(File.class));
+ CompletableFuture<Collection<File>> filesFuture = FutureUtils.combineAll(getOperations);
+ filesFuture.get();
} finally {
executor.shutdownNow();
}
}
- static InputStream getFileHelper(BlobClient blobClient, JobID jobId, BlobKey blobKey)
- throws IOException {
- if (jobId == null) {
- return blobClient.get(blobKey);
+ /**
+ * Retrieves the given blob.
+ *
+ * <p>Note that if a {@link BlobCache} is used, it may try to access the {@link BlobServer} to
+ * retrieve the blob.
+ *
+ * @param service
+ * BLOB client to use for connecting to the BLOB service
+ * @param jobId
+ * job ID or <tt>null</tt> if job-unrelated
+ * @param key
+ * key identifying the BLOB to request
+ * @param highAvailability
+ * whether to check HA mode accessors
+ */
+ static File get(
+ BlobService service, @Nullable JobID jobId, BlobKey key, boolean highAvailability)
+ throws IOException {
+ if (highAvailability) {
+ return service.getPermanentBlobStore().getHAFile(jobId, key);
+ } else if (jobId == null) {
+ return service.getTransientBlobStore().getFile(key);
} else {
- return blobClient.get(jobId, blobKey);
+ return service.getTransientBlobStore().getFile(jobId, key);
+ }
+ }
+
+ /**
+ * Checks that the given blob does not exist anymore by trying to access it.
+ *
+ * <p>Note that if a {@link BlobCache} is used, it may try to access the {@link BlobServer} to
+ * retrieve the blob.
+ *
+ * @param service
+ * BLOB client to use for connecting to the BLOB service
+ * @param jobId
+ * job ID or <tt>null</tt> if job-unrelated
+ * @param key
+ * key identifying the BLOB to request
+ * @param highAvailability
+ * whether to check HA mode accessors
+ */
+ static void verifyDeleted(
+ BlobService service, @Nullable JobID jobId, BlobKey key, boolean highAvailability)
+ throws IOException {
+ try {
+ get(service, jobId, key, highAvailability);
+ fail("File " + jobId + "/" + key + " should have been deleted.");
+ } catch (IOException e) {
+ // expected
}
}
}