You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/20 00:55:18 UTC
[04/19] flink git commit: [FLINK-5129] [distributed runtime]
BlobCache to directly accesses Blobs from distrinbuted file system if
possible
[FLINK-5129] [distributed runtime] BlobCache to directly accesses Blobs from distrinbuted file system if possible
This closes #3084
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f544d83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f544d83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f544d83
Branch: refs/heads/master
Commit: 9f544d83b3443cf33f5890efdb956678847d445f
Parents: e68ee5c
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Nov 22 12:49:03 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Feb 18 19:19:34 2017 +0100
----------------------------------------------------------------------
.../handlers/TaskManagerLogHandler.java | 2 +-
.../apache/flink/runtime/blob/BlobCache.java | 269 +++++++++++--------
.../apache/flink/runtime/blob/BlobClient.java | 3 +-
.../apache/flink/runtime/blob/BlobServer.java | 56 +---
.../runtime/blob/BlobServerConnection.java | 8 +
.../apache/flink/runtime/blob/BlobStore.java | 29 +-
.../apache/flink/runtime/blob/BlobUtils.java | 75 +++---
.../flink/runtime/blob/FileSystemBlobStore.java | 34 +--
.../flink/runtime/blob/VoidBlobStore.java | 9 +-
.../apache/flink/runtime/client/JobClient.java | 8 +-
.../librarycache/BlobLibraryCacheManager.java | 13 +-
.../highavailability/ZookeeperHaServices.java | 20 +-
.../runtime/taskexecutor/TaskExecutor.java | 19 +-
.../runtime/blob/BlobCacheRetriesTest.java | 86 +++++-
.../runtime/blob/BlobCacheSuccessTest.java | 76 +++++-
.../flink/runtime/blob/BlobRecoveryITCase.java | 31 +--
.../runtime/blob/BlobServerDeleteTest.java | 66 ++---
.../flink/runtime/blob/BlobServerRangeTest.java | 1 +
.../flink/runtime/blob/BlobUtilsTest.java | 6 +-
.../BlobLibraryCacheRecoveryITCase.java | 18 +-
20 files changed, 498 insertions(+), 331 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 78c4455..6583d3b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -150,7 +150,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
scala.concurrent.Future<Object> portFuture = jobManager.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout);
scala.concurrent.Future<BlobCache> cacheFuture = portFuture.map(new Mapper<Object, BlobCache>() {
@Override
- public BlobCache apply(Object result) {
+ public BlobCache checkedApply(Object result) throws IOException {
Option<String> hostOption = jobManager.actor().path().address().host();
String host = hostOption.isDefined() ? hostOption.get() : "localhost";
int port = (int) result;
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 7ef1f04..2587b15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -20,12 +20,12 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.util.FileUtils;
-
+import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -35,10 +35,17 @@ import java.net.InetSocketAddress;
import java.net.URL;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
- * The BLOB cache implements a local cache for content-addressable BLOBs. When requesting BLOBs through the
- * {@link BlobCache#getURL} methods, the BLOB cache will first attempt serve the file from its local cache. Only if the
- * local cache does not contain the desired BLOB, the BLOB cache will try to download it from the BLOB server.
+ * The BLOB cache implements a local cache for content-addressable BLOBs.
+ *
+ * <p>When requesting BLOBs through the {@link BlobCache#getURL} methods, the
+ * BLOB cache will first attempt to serve the file from its local cache. Only if
+ * the local cache does not contain the desired BLOB, the BLOB cache will try to
+ * download it from a distributed file system (if available) or the BLOB
+ * server.</p>
*/
public final class BlobCache implements BlobService {
@@ -47,8 +54,12 @@ public final class BlobCache implements BlobService {
private final InetSocketAddress serverAddress;
+ /** Root directory for local file storage */
private final File storageDir;
+ /** Blob store for distributed file storage, e.g. in HA */
+ private final BlobStore blobStore;
+
private final AtomicBoolean shutdownRequested = new AtomicBoolean();
/** Shutdown hook thread to ensure deletion of the storage directory. */
@@ -60,15 +71,62 @@ public final class BlobCache implements BlobService {
/** Configuration for the blob client like ssl parameters required to connect to the blob server */
private final Configuration blobClientConfig;
+ /**
+ * Instantiates a new BLOB cache.
+ *
+ * @param serverAddress
+ * address of the {@link BlobServer} to use for fetching files from
+ * @param blobClientConfig
+ * global configuration
+ *
+ * @throws IOException
+ * thrown if the (local or distributed) file storage cannot be created or
+ * is not usable
+ */
+ public BlobCache(InetSocketAddress serverAddress,
+ Configuration blobClientConfig) throws IOException {
+ this(serverAddress, blobClientConfig,
+ BlobUtils.createBlobStoreFromConfig(blobClientConfig));
+ }
- public BlobCache(InetSocketAddress serverAddress, Configuration blobClientConfig) {
- if (serverAddress == null || blobClientConfig == null) {
- throw new NullPointerException();
- }
-
- this.serverAddress = serverAddress;
+ /**
+ * Instantiates a new BLOB cache.
+ *
+ * @param serverAddress
+ * address of the {@link BlobServer} to use for fetching files from
+ * @param blobClientConfig
+ * global configuration
+ * @param haServices
+ * high availability services able to create a distributed blob store
+ *
+ * @throws IOException
+ * thrown if the (local or distributed) file storage cannot be created or
+ * is not usable
+ */
+ public BlobCache(InetSocketAddress serverAddress,
+ Configuration blobClientConfig, HighAvailabilityServices haServices) throws IOException {
+ this(serverAddress, blobClientConfig, haServices.createBlobStore());
+ }
- this.blobClientConfig = blobClientConfig;
+ /**
+ * Instantiates a new BLOB cache.
+ *
+ * @param serverAddress
+ * address of the {@link BlobServer} to use for fetching files from
+ * @param blobClientConfig
+ * global configuration
+ * @param blobStore
+ * (distributed) blob store file system to retrieve files from first
+ *
+ * @throws IOException
+ * thrown if the (local or distributed) file storage cannot be created or is not usable
+ */
+ private BlobCache(
+ final InetSocketAddress serverAddress, final Configuration blobClientConfig,
+ final BlobStore blobStore) throws IOException {
+ this.serverAddress = checkNotNull(serverAddress);
+ this.blobClientConfig = checkNotNull(blobClientConfig);
+ this.blobStore = blobStore;
// configure and create the storage directory
String storageDirectory = blobClientConfig.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
@@ -101,92 +159,101 @@ public final class BlobCache implements BlobService {
* @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
*/
public URL getURL(final BlobKey requiredBlob) throws IOException {
- if (requiredBlob == null) {
- throw new IllegalArgumentException("BLOB key cannot be null.");
- }
+ checkArgument(requiredBlob != null, "BLOB key cannot be null.");
final File localJarFile = BlobUtils.getStorageLocation(storageDir, requiredBlob);
- if (!localJarFile.exists()) {
+ if (localJarFile.exists()) {
+ return localJarFile.toURI().toURL();
+ }
- final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
+ // first try the distributed blob store (if available)
+ try {
+ blobStore.get(requiredBlob, localJarFile);
+ } catch (Exception e) {
+ LOG.info("Failed to copy from blob store. Downloading from BLOB server instead.", e);
+ }
- // loop over retries
- int attempt = 0;
- while (true) {
+ if (localJarFile.exists()) {
+ return localJarFile.toURI().toURL();
+ }
- if (attempt == 0) {
- LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
- } else {
- LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt);
- }
+ // fallback: download from the BlobServer
+ final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
- try {
- BlobClient bc = null;
- InputStream is = null;
- OutputStream os = null;
-
- try {
- bc = new BlobClient(serverAddress, blobClientConfig);
- is = bc.get(requiredBlob);
- os = new FileOutputStream(localJarFile);
-
- while (true) {
- final int read = is.read(buf);
- if (read < 0) {
- break;
- }
- os.write(buf, 0, read);
- }
+ // loop over retries
+ int attempt = 0;
+ while (true) {
- // we do explicitly not use a finally block, because we want the closing
- // in the regular case to throw exceptions and cause the writing to fail.
- // But, the closing on exception should not throw further exceptions and
- // let us keep the root exception
- os.close();
- os = null;
- is.close();
- is = null;
- bc.close();
- bc = null;
-
- // success, we finished
- break;
- }
- catch (Throwable t) {
- // we use "catch (Throwable)" to keep the root exception. Otherwise that exception
- // it would be replaced by any exception thrown in the finally block
- closeSilently(os);
- closeSilently(is);
- closeSilently(bc);
-
- if (t instanceof IOException) {
- throw (IOException) t;
- } else {
- throw new IOException(t.getMessage(), t);
+ if (attempt == 0) {
+ LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
+ } else {
+ LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt);
+ }
+
+ try {
+ BlobClient bc = null;
+ InputStream is = null;
+ OutputStream os = null;
+
+ try {
+ bc = new BlobClient(serverAddress, blobClientConfig);
+ is = bc.get(requiredBlob);
+ os = new FileOutputStream(localJarFile);
+
+ while (true) {
+ final int read = is.read(buf);
+ if (read < 0) {
+ break;
}
+ os.write(buf, 0, read);
}
+
+ // we do explicitly not use a finally block, because we want the closing
+ // in the regular case to throw exceptions and cause the writing to fail.
+ // But, the closing on exception should not throw further exceptions and
+ // let us keep the root exception
+ os.close();
+ os = null;
+ is.close();
+ is = null;
+ bc.close();
+ bc = null;
+
+ // success, we finished
+ return localJarFile.toURI().toURL();
}
- catch (IOException e) {
- String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress +
- " and store it under " + localJarFile.getAbsolutePath();
- if (attempt < numFetchRetries) {
- attempt++;
- if (LOG.isDebugEnabled()) {
- LOG.debug(message + " Retrying...", e);
- } else {
- LOG.error(message + " Retrying...");
- }
+ catch (Throwable t) {
+ // we use "catch (Throwable)" to keep the root exception. Otherwise that exception
+ // it would be replaced by any exception thrown in the finally block
+ IOUtils.closeQuietly(os);
+ IOUtils.closeQuietly(is);
+ IOUtils.closeQuietly(bc);
+
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ } else {
+ throw new IOException(t.getMessage(), t);
}
- else {
- LOG.error(message + " No retries left.", e);
- throw new IOException(message, e);
+ }
+ }
+ catch (IOException e) {
+ String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress +
+ " and store it under " + localJarFile.getAbsolutePath();
+ if (attempt < numFetchRetries) {
+ attempt++;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(message + " Retrying...", e);
+ } else {
+ LOG.error(message + " Retrying...");
}
}
- } // end loop over retries
- }
-
- return localJarFile.toURI().toURL();
+ else {
+ LOG.error(message + " No retries left.", e);
+ throw new IOException(message, e);
+ }
+ }
+ } // end loop over retries
}
/**
@@ -202,18 +269,23 @@ public final class BlobCache implements BlobService {
}
/**
- * Deletes the file associated with the given key from the BLOB cache and BLOB server.
+ * Deletes the file associated with the given key from the BLOB cache and
+ * BLOB server.
+ *
* @param key referring to the file to be deleted
+ * @throws IOException
+ * thrown if an I/O error occurs while transferring the request to
+ * the BLOB server or if the BLOB server cannot delete the file
*/
public void deleteGlobal(BlobKey key) throws IOException {
- BlobClient bc = createClient();
- try {
- delete(key);
+ // delete locally
+ delete(key);
+ // then delete on the BLOB server
+ // (don't use the distributed storage directly - this way the blob
+ // server is aware of the delete operation, too)
+ try (BlobClient bc = createClient()) {
bc.delete(key);
}
- finally {
- bc.close();
- }
}
@Override
@@ -258,19 +330,4 @@ public final class BlobCache implements BlobService {
return this.storageDir;
}
- // ------------------------------------------------------------------------
- // Miscellaneous
- // ------------------------------------------------------------------------
-
- private void closeSilently(Closeable closeable) {
- if (closeable != null) {
- try {
- closeable.close();
- } catch (Throwable t) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Error while closing resource after BLOB transfer.", t);
- }
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 2748967..ea90f54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -590,7 +590,8 @@ public final class BlobClient implements Closeable {
* @param key
* the key to identify the BLOB
* @throws IOException
- * thrown if an I/O error occurs while transferring the request to the BLOB server
+ * thrown if an I/O error occurs while transferring the request to
+ * the BLOB server or if the BLOB server cannot delete the file
*/
public void delete(BlobKey key) throws IOException {
if (key == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index d4190a7..5b00ae4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -21,10 +21,6 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.net.SSLUtils;
@@ -49,8 +45,8 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
/**
* This class implements the BLOB server. The BLOB server is responsible for listening for incoming requests and
@@ -77,10 +73,10 @@ public class BlobServer extends Thread implements BlobService {
/** Indicates whether a shutdown of server component has been requested. */
private final AtomicBoolean shutdownRequested = new AtomicBoolean();
- /** Is the root directory for file storage */
+ /** Root directory for local file storage */
private final File storageDir;
- /** Blob store for HA */
+ /** Blob store for distributed file storage, e.g. in HA */
private final BlobStore blobStore;
/** Set of currently running threads */
@@ -99,10 +95,11 @@ public class BlobServer extends Thread implements BlobService {
* Instantiates a new BLOB server and binds it to a free network port.
*
* @throws IOException
- * thrown if the BLOB server cannot bind to a free network port
+ * thrown if the BLOB server cannot bind to a free network port or if the
+ * (local or distributed) file storage cannot be created or is not usable
*/
public BlobServer(Configuration config) throws IOException {
- this(config, createBlobStoreFromConfig(config));
+ this(config, BlobUtils.createBlobStoreFromConfig(config));
}
public BlobServer(Configuration config, HighAvailabilityServices haServices) throws IOException {
@@ -110,11 +107,9 @@ public class BlobServer extends Thread implements BlobService {
}
private BlobServer(Configuration config, BlobStore blobStore) throws IOException {
- checkNotNull(config);
+ this.blobServiceConfiguration = checkNotNull(config);
this.blobStore = checkNotNull(blobStore);
- this.blobServiceConfiguration = config;
-
// configure and create the storage directory
String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
@@ -358,9 +353,7 @@ public class BlobServer extends Thread implements BlobService {
*/
@Override
public URL getURL(BlobKey requiredBlob) throws IOException {
- if (requiredBlob == null) {
- throw new IllegalArgumentException("Required BLOB cannot be null.");
- }
+ checkArgument(requiredBlob != null, "BLOB key cannot be null.");
final File localFile = BlobUtils.getStorageLocation(storageDir, requiredBlob);
@@ -450,37 +443,4 @@ public class BlobServer extends Thread implements BlobService {
}
}
- private static BlobStore createBlobStoreFromConfig(Configuration config) throws IOException {
- HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config);
-
- if (highAvailabilityMode == HighAvailabilityMode.NONE) {
- return new VoidBlobStore();
- } else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
- final String storagePath = config.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
- if (isNullOrWhitespaceOnly(storagePath)) {
- throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " +
- HighAvailabilityOptions.HA_STORAGE_PATH);
- }
-
- final Path path;
- try {
- path = new Path(storagePath);
- } catch (Exception e) {
- throw new IOException("Invalid path for highly available storage (" +
- HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
- }
-
- final FileSystem fileSystem;
- try {
- fileSystem = path.getFileSystem();
- } catch (Exception e) {
- throw new IOException("Could not create FileSystem for highly available storage (" +
- HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
- }
-
- return new FileSystemBlobStore(fileSystem, storagePath);
- } else {
- throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + ".");
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index 321fc67..13a90c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -168,6 +168,14 @@ class BlobServerConnection extends Thread {
* thrown if an I/O error occurs while reading/writing data from/to the respective streams
*/
private void get(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
+ /**
+ * Retrieve the file from the (distributed?) BLOB store and store it
+ * locally, then send it to the service which requested it.
+ *
+ * Instead, we could send it from the distributed store directly but
+ * chances are high that if there is one request, there will be more
+ * so a local cache makes more sense.
+ */
File blobFile;
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
index 7050338..64dc942 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.api.common.JobID;
import java.io.File;
+import java.io.IOException;
/**
* A blob store.
@@ -32,9 +33,9 @@ public interface BlobStore {
*
* @param localFile The file to copy
* @param blobKey The ID for the file in the blob store
- * @throws Exception If the copy fails
+ * @throws IOException If the copy fails
*/
- void put(File localFile, BlobKey blobKey) throws Exception;
+ void put(File localFile, BlobKey blobKey) throws IOException;
/**
* Copies a local file to the blob store.
@@ -44,18 +45,18 @@ public interface BlobStore {
* @param localFile The file to copy
* @param jobId The JobID part of ID for the file in the blob store
* @param key The String part of ID for the file in the blob store
- * @throws Exception If the copy fails
+ * @throws IOException If the copy fails
*/
- void put(File localFile, JobID jobId, String key) throws Exception;
+ void put(File localFile, JobID jobId, String key) throws IOException;
/**
* Copies a blob to a local file.
*
* @param blobKey The blob ID
* @param localFile The local file to copy to
- * @throws Exception If the copy fails
+ * @throws IOException If the copy fails
*/
- void get(BlobKey blobKey, File localFile) throws Exception;
+ void get(BlobKey blobKey, File localFile) throws IOException;
/**
* Copies a blob to a local file.
@@ -63,19 +64,23 @@ public interface BlobStore {
* @param jobId The JobID part of ID for the blob
* @param key The String part of ID for the blob
* @param localFile The local file to copy to
- * @throws Exception If the copy fails
+ * @throws IOException If the copy fails
*/
- void get(JobID jobId, String key, File localFile) throws Exception;
+ void get(JobID jobId, String key, File localFile) throws IOException;
/**
- * Deletes a blob.
+ * Tries to delete a blob from storage.
+ *
+ * <p>NOTE: This also tries to delete any created directories if empty.</p>
*
* @param blobKey The blob ID
*/
void delete(BlobKey blobKey);
/**
- * Deletes a blob.
+ * Tries to delete a blob from storage.
+ *
+ * <p>NOTE: This also tries to delete any created directories if empty.</p>
*
* @param jobId The JobID part of ID for the blob
* @param key The String part of ID for the blob
@@ -83,7 +88,9 @@ public interface BlobStore {
void delete(JobID jobId, String key);
/**
- * Deletes blobs.
+ * Tries to delete all blobs for the given job from storage.
+ *
+ * <p>NOTE: This also tries to delete any created directories if empty.</p>
*
* @param jobId The JobID part of all blobs to delete
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index aeaa602..b5ba565 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -21,20 +21,19 @@ package org.apache.flink.runtime.blob;
import com.google.common.io.BaseEncoding;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.IOUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.highavailability.ZookeeperHaServices;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import java.io.EOFException;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
-import java.net.URI;
import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
@@ -68,11 +67,39 @@ public class BlobUtils {
static final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
/**
+ * Creates a BlobStore based on the parameters set in the configuration.
+ *
+ * @param config
+ * configuration to use
+ *
+ * @return a (distributed) blob store for high availability
+ *
+ * @throws IOException
+ * thrown if the (distributed) file storage cannot be created
+ */
+ static BlobStore createBlobStoreFromConfig(Configuration config) throws IOException {
+ HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config);
+
+ if (highAvailabilityMode == HighAvailabilityMode.NONE) {
+ return new VoidBlobStore();
+ } else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
+ return ZookeeperHaServices.createBlobStore(config);
+ } else {
+ throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "'.");
+ }
+ }
+
+ /**
* Creates a storage directory for a blob service.
*
* @return the storage directory used by a BLOB service
+ *
+ * @throws IOException
+ * thrown if the (local or distributed) file storage cannot be created or
+ * is not usable
*/
- static File initStorageDirectory(String storageDirectory) {
+ static File initStorageDirectory(String storageDirectory) throws
+ IOException {
File baseDir;
if (StringUtils.isNullOrWhitespaceOnly(storageDirectory)) {
baseDir = new File(System.getProperty("java.io.tmpdir"));
@@ -96,7 +123,7 @@ public class BlobUtils {
}
// max attempts exceeded to find a storage directory
- throw new RuntimeException("Could not create storage directory for BLOB store in '" + baseDir + "'.");
+ throw new IOException("Could not create storage directory for BLOB store in '" + baseDir + "'.");
}
/**
@@ -341,7 +368,7 @@ public class BlobUtils {
*/
static String getRecoveryPath(String basePath, BlobKey blobKey) {
// format: $base/cache/blob_$key
- return String.format("%s/cache/%s", basePath, BLOB_FILE_PREFIX + blobKey.toString());
+ return String.format("%s/cache/%s%s", basePath, BLOB_FILE_PREFIX, blobKey.toString());
}
/**
@@ -353,8 +380,8 @@ public class BlobUtils {
*/
static String getRecoveryPath(String basePath, JobID jobId, String key) {
// format: $base/job_$id/blob_$key
- return String.format("%s/%s/%s", basePath, JOB_DIR_PREFIX + jobId.toString(),
- BLOB_FILE_PREFIX + encodeKey(key));
+ return String.format("%s/%s%s/%s%s", basePath, JOB_DIR_PREFIX, jobId.toString(),
+ BLOB_FILE_PREFIX, encodeKey(key));
}
/**
@@ -363,33 +390,7 @@ public class BlobUtils {
* <p>The returned path can be used with the state backend for recovery purposes.
*/
static String getRecoveryPath(String basePath, JobID jobId) {
- return String.format("%s/%s", basePath, JOB_DIR_PREFIX + jobId.toString());
- }
-
- /**
- * Copies the file from the recovery path to the local file.
- */
- static void copyFromRecoveryPath(String recoveryPath, File localBlobFile) throws Exception {
- if (recoveryPath == null) {
- throw new IllegalStateException("Failed to determine recovery path.");
- }
-
- if (!localBlobFile.createNewFile()) {
- throw new IllegalStateException("Failed to create new local file to copy to");
- }
-
- URI uri = new URI(recoveryPath);
- Path path = new Path(recoveryPath);
-
- if (FileSystem.get(uri).exists(path)) {
- try (InputStream is = FileSystem.get(uri).open(path)) {
- FileOutputStream fos = new FileOutputStream(localBlobFile);
- IOUtils.copyBytes(is, fos); // closes the streams
- }
- }
- else {
- throw new IOException("Cannot find required BLOB at '" + recoveryPath + "' for recovery.");
- }
+ return String.format("%s/%s%s", basePath, JOB_DIR_PREFIX, jobId.toString());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 2c05002..7cfce7a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -64,16 +64,16 @@ public class FileSystemBlobStore implements BlobStore {
// - Put ------------------------------------------------------------------
@Override
- public void put(File localFile, BlobKey blobKey) throws Exception {
+ public void put(File localFile, BlobKey blobKey) throws IOException {
put(localFile, BlobUtils.getRecoveryPath(basePath, blobKey));
}
@Override
- public void put(File localFile, JobID jobId, String key) throws Exception {
+ public void put(File localFile, JobID jobId, String key) throws IOException {
put(localFile, BlobUtils.getRecoveryPath(basePath, jobId, key));
}
- private void put(File fromFile, String toBlobPath) throws Exception {
+ private void put(File fromFile, String toBlobPath) throws IOException {
try (OutputStream os = fileSystem.create(new Path(toBlobPath), true)) {
LOG.debug("Copying from {} to {}.", fromFile, toBlobPath);
Files.copy(fromFile, os);
@@ -83,16 +83,16 @@ public class FileSystemBlobStore implements BlobStore {
// - Get ------------------------------------------------------------------
@Override
- public void get(BlobKey blobKey, File localFile) throws Exception {
+ public void get(BlobKey blobKey, File localFile) throws IOException {
get(BlobUtils.getRecoveryPath(basePath, blobKey), localFile);
}
@Override
- public void get(JobID jobId, String key, File localFile) throws Exception {
+ public void get(JobID jobId, String key, File localFile) throws IOException {
get(BlobUtils.getRecoveryPath(basePath, jobId, key), localFile);
}
- private void get(String fromBlobPath, File toFile) throws Exception {
+ private void get(String fromBlobPath, File toFile) throws IOException {
checkNotNull(fromBlobPath, "Blob path");
checkNotNull(toFile, "File");
@@ -102,17 +102,21 @@ public class FileSystemBlobStore implements BlobStore {
final Path fromPath = new Path(fromBlobPath);
- if (fileSystem.exists(fromPath)) {
- try (InputStream is = fileSystem.open(fromPath);
- FileOutputStream fos = new FileOutputStream(toFile))
- {
- LOG.debug("Copying from {} to {}.", fromBlobPath, toFile);
- IOUtils.copyBytes(is, fos); // closes the streams
+ boolean success = false;
+ try (InputStream is = fileSystem.open(fromPath);
+ FileOutputStream fos = new FileOutputStream(toFile)) {
+ LOG.debug("Copying from {} to {}.", fromBlobPath, toFile);
+ IOUtils.copyBytes(is, fos); // closes the streams
+ success = true;
+ } finally {
+ // if the copy fails, we need to remove the target file because
+ // outside code relies on a correct file as long as it exists
+ if (!success) {
+ try {
+ toFile.delete();
+ } catch (Throwable ignored) {}
}
}
- else {
- throw new IOException(fromBlobPath + " does not exist.");
- }
}
// - Delete ---------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
index ece2ac1..8606844 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.api.common.JobID;
import java.io.File;
+import java.io.IOException;
/**
* A blob store doing nothing.
@@ -28,19 +29,19 @@ import java.io.File;
public class VoidBlobStore implements BlobStore {
@Override
- public void put(File localFile, BlobKey blobKey) throws Exception {
+ public void put(File localFile, BlobKey blobKey) throws IOException {
}
@Override
- public void put(File localFile, JobID jobId, String key) throws Exception {
+ public void put(File localFile, JobID jobId, String key) throws IOException {
}
@Override
- public void get(BlobKey blobKey, File localFile) throws Exception {
+ public void get(BlobKey blobKey, File localFile) throws IOException {
}
@Override
- public void get(JobID jobId, String key, File localFile) throws Exception {
+ public void get(JobID jobId, String key, File localFile) throws IOException {
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 9f0c573..76d6d86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -209,7 +209,13 @@ public class JobClient {
Option<String> jmHost = jobManager.actor().path().address().host();
String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort());
- final BlobCache blobClient = new BlobCache(serverAddress, config);
+ final BlobCache blobClient;
+ try {
+ blobClient = new BlobCache(serverAddress, config);
+ } catch (IOException e) {
+ throw new JobRetrievalException(jobID,
+ "Failed to setup blob cache", e);
+ }
final Collection<BlobKey> requiredJarFiles = props.requiredJarFiles();
final Collection<URL> requiredClasspaths = props.requiredClasspaths();
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index c94768d..b0d5d83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -37,11 +37,12 @@ import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* For each job graph that is submitted to the system the library cache manager maintains
* a set of libraries (typically JAR files) which the job requires to run. The library cache manager
@@ -73,7 +74,7 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
// --------------------------------------------------------------------------------------------
public BlobLibraryCacheManager(BlobService blobService, long cleanupInterval) {
- this.blobService = blobService;
+ this.blobService = checkNotNull(blobService);
// Initializing the clean up task
this.cleanupTimer = new Timer(true);
@@ -91,8 +92,8 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
@Override
public void registerTask(JobID jobId, ExecutionAttemptID task, Collection<BlobKey> requiredJarFiles,
Collection<URL> requiredClasspaths) throws IOException {
- Preconditions.checkNotNull(jobId, "The JobId must not be null.");
- Preconditions.checkNotNull(task, "The task execution id must not be null.");
+ checkNotNull(jobId, "The JobId must not be null.");
+ checkNotNull(task, "The task execution id must not be null.");
if (requiredJarFiles == null) {
requiredJarFiles = Collections.emptySet();
@@ -153,8 +154,8 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
@Override
public void unregisterTask(JobID jobId, ExecutionAttemptID task) {
- Preconditions.checkNotNull(jobId, "The JobId must not be null.");
- Preconditions.checkNotNull(task, "The task execution id must not be null.");
+ checkNotNull(jobId, "The JobId must not be null.");
+ checkNotNull(task, "The task execution id must not be null.");
synchronized (lockObject) {
LibraryCacheEntry entry = cacheEntries.get(jobId);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index 25d21ef..ed0ad17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -154,7 +154,21 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
@Override
public BlobStore createBlobStore() throws IOException {
- final String storagePath = configuration.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
+ return createBlobStore(configuration);
+ }
+
+ /**
+ * Creates the BLOB store in which BLOBs are stored in a highly-available
+ * fashion.
+ *
+ * @param configuration configuration to extract the storage path from
+ * @return Blob store
+ * @throws IOException if the blob store could not be created
+ */
+ public static BlobStore createBlobStore(
+ final Configuration configuration) throws IOException {
+ String storagePath = configuration.getValue(
+ HighAvailabilityOptions.HA_STORAGE_PATH);
if (isNullOrWhitespaceOnly(storagePath)) {
throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " +
HighAvailabilityOptions.HA_STORAGE_PATH);
@@ -176,6 +190,10 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
}
+ final String clusterId =
+ configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
+ storagePath += "/" + clusterId;
+
return new FileSystemBlobStore(fileSystem, storagePath);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f11cb98..58bbfac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -794,11 +794,20 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
InetSocketAddress address = new InetSocketAddress(jobMasterGateway.getAddress(), blobPort);
- BlobCache blobCache = new BlobCache(address, taskManagerConfiguration.getConfiguration());
-
- LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(
- blobCache,
- taskManagerConfiguration.getCleanupInterval());
+ final LibraryCacheManager libraryCacheManager;
+ try {
+ final BlobCache blobCache = new BlobCache(address, taskManagerConfiguration.getConfiguration(), haServices);
+ libraryCacheManager = new BlobLibraryCacheManager(
+ blobCache,
+ taskManagerConfiguration.getCleanupInterval());
+ } catch (IOException e) {
+ // Can't pass the IOException up - we need a RuntimeException anyway
+ // two levels up where this is run asynchronously. Also, we don't
+ // know whether this is caught in the thread running this method.
+ final String message = "Could not create BLOB cache or library cache.";
+ log.error(message, e);
+ throw new RuntimeException(message, e);
+ }
ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(
jobManagerLeaderId,
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
index 4aa9a21..34a8a39 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -19,7 +19,10 @@
package org.apache.flink.runtime.blob;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.io.InputStream;
@@ -33,22 +36,52 @@ import static org.junit.Assert.*;
*/
public class BlobCacheRetriesTest {
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
/**
* A test where the connection fails twice and then the get operation succeeds.
*/
@Test
public void testBlobFetchRetries() {
+ final Configuration config = new Configuration();
+
+ testBlobFetchRetries(config);
+ }
+ /**
+ * A test where the connection fails twice and then the get operation succeeds
+ * (with high availability set).
+ */
+ @Test
+ public void testBlobFetchRetriesHa() {
+ final Configuration config = new Configuration();
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+ temporaryFolder.getRoot().getPath());
+
+ testBlobFetchRetries(config);
+ }
+
+ /**
+ * A test where the BlobCache must use the BlobServer and the connection
+ * fails twice and then the get operation succeeds.
+ *
+ * @param config
+ * configuration to use (the BlobCache will get some additional settings
+ * set compared to this one)
+ */
+ private void testBlobFetchRetries(final Configuration config) {
final byte[] data = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
BlobServer server = null;
BlobCache cache = null;
try {
- final Configuration config = new Configuration();
server = new TestingFailingBlobServer(config, 2);
- final InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+ final InetSocketAddress
+ serverAddress = new InetSocketAddress("localhost", server.getPort());
// upload some blob
BlobClient blobClient = null;
@@ -64,9 +97,15 @@ public class BlobCacheRetriesTest {
}
}
- cache = new BlobCache(serverAddress, config);
+ // create a separate config for the cache with no access to
+ // the (shared) storage path if available so that the cache
+ // will always bother the BlobServer!
+ final Configuration cacheConfig = new Configuration(config);
+ cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+ temporaryFolder.getRoot().getPath() + "/does-not-exist");
+ cache = new BlobCache(serverAddress, cacheConfig);
- // trigger a download - it should fail on the first time, but retry, and succeed at the second time
+ // trigger a download - it should fail the first two times, but retry, and succeed eventually
URL url = cache.getURL(key);
InputStream is = url.openStream();
try {
@@ -97,17 +136,44 @@ public class BlobCacheRetriesTest {
*/
@Test
public void testBlobFetchWithTooManyFailures() {
+ final Configuration config = new Configuration();
+ testBlobFetchWithTooManyFailures(config);
+ }
+
+ /**
+ * A test where the connection fails twice and then the get operation succeeds
+ * (with high availability set).
+ */
+ @Test
+ public void testBlobFetchWithTooManyFailuresHa() {
+ final Configuration config = new Configuration();
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+ temporaryFolder.getRoot().getPath());
+
+ testBlobFetchWithTooManyFailures(config);
+ }
+
+ /**
+ * A test where the BlobCache must use the BlobServer and the connection
+ * fails too often which eventually fails the GET request.
+ *
+ * @param config
+ * configuration to use (the BlobCache will get some additional settings
+ * set compared to this one)
+ */
+ private void testBlobFetchWithTooManyFailures(final Configuration config) {
final byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
BlobServer server = null;
BlobCache cache = null;
try {
- final Configuration config = new Configuration();
server = new TestingFailingBlobServer(config, 10);
- final InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+ final InetSocketAddress
+ serverAddress = new InetSocketAddress("localhost", server.getPort());
// upload some blob
BlobClient blobClient = null;
@@ -123,7 +189,13 @@ public class BlobCacheRetriesTest {
}
}
- cache = new BlobCache(serverAddress, config);
+ // create a separate config for the cache with no access to
+ // the (shared) storage path if available so that the cache
+ // will always bother the BlobServer!
+ final Configuration cacheConfig = new Configuration(config);
+ cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+ temporaryFolder.getRoot().getPath() + "/does-not-exist");
+ cache = new BlobCache(serverAddress, cacheConfig);
// trigger a download - it should fail eventually
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index 7ba5a8a..db55331 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -18,6 +18,12 @@
package org.apache.flink.runtime.blob;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
import java.io.File;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
@@ -25,9 +31,6 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.List;
-import org.apache.flink.configuration.Configuration;
-import org.junit.Test;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -38,9 +41,48 @@ import static org.junit.Assert.fail;
*/
public class BlobCacheSuccessTest {
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ /**
+ * BlobCache with no HA. BLOBs need to be downloaded form a working
+ * BlobServer.
+ */
@Test
public void testBlobCache() {
+ Configuration config = new Configuration();
+ uploadFileGetTest(config, false, false);
+ }
+
+ /**
+ * BlobCache is configured in HA mode and the cache can download files from
+ * the file system directly and does not need to download BLOBs from the
+ * BlobServer.
+ */
+ @Test
+ public void testBlobCacheHa() {
+ Configuration config = new Configuration();
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+ temporaryFolder.getRoot().getPath());
+ uploadFileGetTest(config, true, true);
+ }
+ /**
+ * BlobCache is configured in HA mode but the cache itself cannot access the
+ * file system and thus needs to download BLOBs from the BlobServer.
+ */
+ @Test
+ public void testBlobCacheHaFallback() {
+ Configuration config = new Configuration();
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+ temporaryFolder.getRoot().getPath());
+ uploadFileGetTest(config, false, false);
+ }
+
+ private void uploadFileGetTest(final Configuration config, boolean cacheWorksWithoutServer,
+ boolean cacheHasAccessToFs) {
// First create two BLOBs and upload them to BLOB server
final byte[] buf = new byte[128];
final List<BlobKey> blobKeys = new ArrayList<BlobKey>(2);
@@ -50,7 +92,6 @@ public class BlobCacheSuccessTest {
try {
// Start the BLOB server
- Configuration config = new Configuration();
blobServer = new BlobServer(config);
final InetSocketAddress serverAddress = new InetSocketAddress(blobServer.getPort());
@@ -69,15 +110,34 @@ public class BlobCacheSuccessTest {
}
}
- blobCache = new BlobCache(serverAddress, new Configuration());
+ if (cacheWorksWithoutServer) {
+ // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache.
+ blobServer.shutdown();
+ blobServer = null;
+ }
+
+ final Configuration cacheConfig;
+ if (cacheHasAccessToFs) {
+ cacheConfig = config;
+ } else {
+ // just in case parameters are still read from the server,
+ // create a separate configuration object for the cache
+ cacheConfig = new Configuration(config);
+ cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+ temporaryFolder.getRoot().getPath() + "/does-not-exist");
+ }
+
+ blobCache = new BlobCache(serverAddress, cacheConfig);
for (BlobKey blobKey : blobKeys) {
blobCache.getURL(blobKey);
}
- // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache.
- blobServer.shutdown();
- blobServer = null;
+ if (blobServer != null) {
+ // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache.
+ blobServer.shutdown();
+ blobServer = null;
+ }
final URL[] urls = new URL[blobKeys.size()];
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index a8eb1d3..d043665 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.blob;
-import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -26,9 +25,9 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
@@ -39,27 +38,14 @@ import java.util.Arrays;
import java.util.Random;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class BlobRecoveryITCase {
- private File recoveryDir;
-
- @Before
- public void setUp() throws Exception {
- recoveryDir = new File(FileUtils.getTempDirectory(), "BlobRecoveryITCaseDir");
- if (!recoveryDir.exists() && !recoveryDir.mkdirs()) {
- throw new IllegalStateException("Failed to create temp directory for test");
- }
- }
-
- @After
- public void cleanUp() throws Exception {
- if (recoveryDir != null) {
- FileUtils.deleteDirectory(recoveryDir);
- }
- }
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
/**
* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any
@@ -70,13 +56,14 @@ public class BlobRecoveryITCase {
Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
- config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, recoveryDir.getPath());
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath());
testBlobServerRecovery(config);
}
public static void testBlobServerRecovery(final Configuration config) throws IOException {
- String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH);
+ final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
+ String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/" + clusterId;
Random rand = new Random();
BlobServer[] server = new BlobServer[2];
@@ -84,7 +71,6 @@ public class BlobRecoveryITCase {
BlobClient client = null;
try {
-
for (int i = 0; i < server.length; i++) {
server[i] = new BlobServer(config);
serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort());
@@ -165,6 +151,7 @@ public class BlobRecoveryITCase {
client.delete(jobId[1], testKey[1]);
// Verify everything is clean
+ assertTrue("HA storage directory does not exist", fs.exists(new Path(storagePath)));
if (fs.exists(blobServerPath)) {
final org.apache.flink.core.fs.FileStatus[] recoveryFiles =
fs.listStatus(blobServerPath);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 53e1d73..025a2ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -85,16 +85,7 @@ public class BlobServerDeleteTest {
fail(e.getMessage());
}
finally {
- if (client != null) {
- try {
- client.close();
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
- if (server != null) {
- server.shutdown();
- }
+ cleanup(server, client);
}
}
@@ -157,16 +148,7 @@ public class BlobServerDeleteTest {
fail(e.getMessage());
}
finally {
- if (client != null) {
- try {
- client.close();
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
- if (server != null) {
- server.shutdown();
- }
+ cleanup(server, client);
}
}
@@ -205,16 +187,7 @@ public class BlobServerDeleteTest {
fail(e.getMessage());
}
finally {
- if (client != null) {
- try {
- client.close();
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
- if (server != null) {
- server.shutdown();
- }
+ cleanup(server, client);
}
}
@@ -254,16 +227,7 @@ public class BlobServerDeleteTest {
fail(e.getMessage());
}
finally {
- if (client != null) {
- try {
- client.close();
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
- if (server != null) {
- server.shutdown();
- }
+ cleanup(server, client);
}
}
@@ -312,16 +276,20 @@ public class BlobServerDeleteTest {
fail(e.getMessage());
}
finally {
- if (client != null) {
- try {
- client.close();
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
- if (server != null) {
- server.shutdown();
+ cleanup(server, client);
+ }
+ }
+
+ private void cleanup(BlobServer server, BlobClient client) {
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Throwable t) {
+ t.printStackTrace();
}
}
+ if (server != null) {
+ server.shutdown();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
index 36ae8cc..ea0eb94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
@@ -40,6 +40,7 @@ public class BlobServerRangeTest extends TestLogger {
Configuration conf = new Configuration();
conf.setString(ConfigConstants.BLOB_SERVER_PORT, "0");
BlobServer srv = new BlobServer(conf);
+ srv.shutdown();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
index 63ec338..081e28c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
@@ -30,6 +30,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
+import java.io.IOException;
public class BlobUtilsTest {
@@ -55,8 +56,9 @@ public class BlobUtilsTest {
assertTrue(blobUtilsTestDirectory.delete());
}
- @Test(expected = Exception.class)
- public void testExceptionOnCreateStorageDirectoryFailure() {
+ @Test(expected = IOException.class)
+ public void testExceptionOnCreateStorageDirectoryFailure() throws
+ IOException {
// Should throw an Exception
BlobUtils.initStorageDirectory(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS).getAbsolutePath());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index a727d51..d3925be 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -43,6 +43,7 @@ import java.util.List;
import java.util.Random;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
public class BlobLibraryCacheRecoveryITCase {
@@ -62,12 +63,12 @@ public class BlobLibraryCacheRecoveryITCase {
BlobCache cache = null;
BlobLibraryCacheManager libCache = null;
- try {
- Configuration config = new Configuration();
- config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
- config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
- config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath());
+ Configuration config = new Configuration();
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+ config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath());
+ try {
for (int i = 0; i < server.length; i++) {
server[i] = new BlobServer(config);
serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort());
@@ -144,8 +145,11 @@ public class BlobLibraryCacheRecoveryITCase {
client.delete(keys.get(1));
}
- // Verify everything is clean
- File[] recoveryFiles = temporaryFolder.getRoot().listFiles();
+ // Verify everything is clean below recoveryDir/<cluster_id>
+ final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
+ File haBlobStoreDir = new File(temporaryFolder.getRoot(), clusterId);
+ File[] recoveryFiles = haBlobStoreDir.listFiles();
+ assertNotNull("HA storage directory does not exist", recoveryFiles);
assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
}
finally {