You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:59:00 UTC
[09/47] flink git commit: [FLINK-2805] [blobmanager] Write JARs to
file state backend for recovery
[FLINK-2805] [blobmanager] Write JARs to file state backend for recovery
Move StateBackend enum to top level and org.apache.flink.runtime.state
Abstract blob store in blob server for recovery
This closes #1227.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c3a4d1d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c3a4d1d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c3a4d1d9
Branch: refs/heads/master
Commit: c3a4d1d9f720a1da9697d0bbf48f7a3b1f5851b8
Parents: c2989f2
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Oct 5 14:30:46 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 20 00:16:52 2015 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/blob/BlobServer.java | 105 +++++++++--
.../runtime/blob/BlobServerConnection.java | 52 ++++--
.../apache/flink/runtime/blob/BlobStore.java | 97 ++++++++++
.../apache/flink/runtime/blob/BlobUtils.java | 75 +++++++-
.../flink/runtime/blob/FileSystemBlobStore.java | 186 +++++++++++++++++++
.../flink/runtime/blob/VoidBlobStore.java | 61 ++++++
.../flink/runtime/jobmanager/RecoveryMode.java | 12 +-
.../flink/runtime/blob/BlobRecoveryITCase.java | 159 ++++++++++++++++
.../BlobLibraryCacheRecoveryITCase.java | 176 ++++++++++++++++++
9 files changed, 874 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index ef2ef61..d0bed8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -18,6 +18,14 @@
package org.apache.flink.runtime.blob;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -30,13 +38,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.common.JobID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkNotNull;
/**
* This class implements the BLOB server. The BLOB server is responsible for listening for incoming requests and
@@ -57,12 +59,12 @@ public class BlobServer extends Thread implements BlobService {
/** Indicates whether a shutdown of server component has been requested. */
private final AtomicBoolean shutdownRequested = new AtomicBoolean();
- /** Shutdown hook thread to ensure deletion of the storage directory. */
- private final Thread shutdownHook;
-
/** Is the root directory for file storage */
private final File storageDir;
+ /** Blob store for recovery */
+ private final BlobStore blobStore;
+
/** Set of currently running threads */
private final Set<BlobServerConnection> activeConnections = new HashSet<BlobServerConnection>();
@@ -70,18 +72,43 @@ public class BlobServer extends Thread implements BlobService {
private final int maxConnections;
/**
+ * Shutdown hook thread to ensure deletion of the storage directory (or <code>null</code> if
+ * the configured recovery mode does not equal{@link RecoveryMode#STANDALONE})
+ */
+ private final Thread shutdownHook;
+
+ /**
* Instantiates a new BLOB server and binds it to a free network port.
- *
+ *
* @throws IOException
* thrown if the BLOB server cannot bind to a free network port
*/
public BlobServer(Configuration config) throws IOException {
+ checkNotNull(config, "Configuration");
+
+ RecoveryMode recoveryMode = RecoveryMode.fromConfig(config);
// configure and create the storage directory
String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
LOG.info("Created BLOB server storage directory {}", storageDir);
+ // No recovery.
+ if (recoveryMode == RecoveryMode.STANDALONE) {
+ this.blobStore = new VoidBlobStore();
+ }
+ // Recovery. Check that everything has been setup correctly. This is not clean, but it's
+ // better to resolve this with some upcoming changes to the state backend setup.
+ else if (config.containsKey(ConfigConstants.STATE_BACKEND) &&
+ config.containsKey(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)) {
+
+ this.blobStore = new FileSystemBlobStore(config);
+ }
+ // Fallback.
+ else {
+ this.blobStore = new VoidBlobStore();
+ }
+
// configure the maximum number of concurrent connections
final int maxConnections = config.getInteger(
ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
@@ -102,8 +129,13 @@ public class BlobServer extends Thread implements BlobService {
backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
}
- // Add shutdown hook to delete storage directory
- this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
+ if (recoveryMode == RecoveryMode.STANDALONE) {
+ // Add shutdown hook to delete storage directory
+ this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
+ }
+ else {
+ this.shutdownHook = null;
+ }
// start the server
try {
@@ -132,37 +164,43 @@ public class BlobServer extends Thread implements BlobService {
* Returns a file handle to the file associated with the given blob key on the blob
* server.
*
+ * <p><strong>This is only called from the {@link BlobServerConnection}</strong>
+ *
* @param key identifying the file
* @return file handle to the file
*/
- public File getStorageLocation(BlobKey key) {
+ File getStorageLocation(BlobKey key) {
return BlobUtils.getStorageLocation(storageDir, key);
}
/**
* Returns a file handle to the file identified by the given jobID and key.
*
+ * <p><strong>This is only called from the {@link BlobServerConnection}</strong>
+ *
* @param jobID to which the file is associated
* @param key to identify the file within the job context
* @return file handle to the file
*/
- public File getStorageLocation(JobID jobID, String key) {
+ File getStorageLocation(JobID jobID, String key) {
return BlobUtils.getStorageLocation(storageDir, jobID, key);
}
/**
* Method which deletes all files associated with the given jobID.
*
+ * <p><strong>This is only called from the {@link BlobServerConnection}</strong>
+ *
* @param jobID all files associated to this jobID will be deleted
* @throws IOException
*/
- public void deleteJobDirectory(JobID jobID) throws IOException {
+ void deleteJobDirectory(JobID jobID) throws IOException {
BlobUtils.deleteJobDirectory(storageDir, jobID);
}
/**
* Returns a temporary file inside the BLOB server's incoming directory.
- *
+ *
* @return a temporary file inside the BLOB server's incoming directory
*/
File createTemporaryFilename() {
@@ -170,6 +208,13 @@ public class BlobServer extends Thread implements BlobService {
String.format("temp-%08d", tempFileCounter.getAndIncrement()));
}
+ /**
+ * Returns the blob store.
+ */
+ BlobStore getBlobStore() {
+ return blobStore;
+ }
+
@Override
public void run() {
try {
@@ -245,6 +290,9 @@ public class BlobServer extends Thread implements BlobService {
LOG.error("BLOB server failed to properly clean up its storage directory.");
}
+ // Clean up the recovery directory
+ blobStore.cleanUp();
+
// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
// shutdown hook itself
if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
@@ -282,11 +330,26 @@ public class BlobServer extends Thread implements BlobService {
final File localFile = BlobUtils.getStorageLocation(storageDir, requiredBlob);
- if (!localFile.exists()) {
- throw new FileNotFoundException("File " + localFile.getCanonicalPath() + " does not exist.");
- } else {
+ if (localFile.exists()) {
return localFile.toURI().toURL();
}
+ else {
+ try {
+ // Try the blob store
+ blobStore.get(requiredBlob, localFile);
+ }
+ catch (Exception e) {
+ throw new IOException("Failed to copy from blob store.", e);
+ }
+
+ if (localFile.exists()) {
+ return localFile.toURI().toURL();
+ }
+ else {
+ throw new FileNotFoundException("Local file " + localFile + " does not exist " +
+ "and failed to copy from blob store.");
+ }
+ }
}
/**
@@ -305,6 +368,8 @@ public class BlobServer extends Thread implements BlobService {
LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath());
}
}
+
+ blobStore.delete(key);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index 793a9d6..d7bba8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -18,6 +18,12 @@
package org.apache.flink.runtime.blob;
+import com.google.common.io.Files;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
@@ -29,28 +35,21 @@ import java.net.Socket;
import java.net.SocketException;
import java.security.MessageDigest;
-import com.google.common.io.Files;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.util.InstantiationUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_ID_SCOPE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.MAX_KEY_LENGTH;
import static org.apache.flink.runtime.blob.BlobServerProtocol.NAME_ADDRESSABLE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
import static org.apache.flink.runtime.blob.BlobUtils.closeSilently;
import static org.apache.flink.runtime.blob.BlobUtils.readFully;
import static org.apache.flink.runtime.blob.BlobUtils.readLength;
import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.MAX_KEY_LENGTH;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
-
/**
* A BLOB connection handles a series of requests from a particular BLOB client.
*/
@@ -181,10 +180,18 @@ class BlobServerConnection extends Thread {
JobID jobID = JobID.fromByteArray(jidBytes);
String key = readKey(buf, inputStream);
blobFile = this.blobServer.getStorageLocation(jobID, key);
+
+ if (!blobFile.exists()) {
+ blobServer.getBlobStore().get(jobID, key, blobFile);
+ }
}
else if (contentAddressable == CONTENT_ADDRESSABLE) {
final BlobKey key = BlobKey.readFromInputStream(inputStream);
blobFile = blobServer.getStorageLocation(key);
+
+ if (!blobFile.exists()) {
+ blobServer.getBlobStore().get(key, blobFile);
+ }
}
else {
throw new IOException("Unknown type of BLOB addressing.");
@@ -194,6 +201,7 @@ class BlobServerConnection extends Thread {
if (!blobFile.exists()) {
throw new IOException("Cannot find required BLOB at " + blobFile.getAbsolutePath());
}
+
if (blobFile.length() > Integer.MAX_VALUE) {
throw new IOException("BLOB size exceeds the maximum size (2 GB).");
}
@@ -220,8 +228,7 @@ class BlobServerConnection extends Thread {
int blobLen = (int) blobFile.length();
writeLength(blobLen, outputStream);
- FileInputStream fis = new FileInputStream(blobFile);
- try {
+ try (FileInputStream fis = new FileInputStream(blobFile)) {
int bytesRemaining = blobLen;
while (bytesRemaining > 0) {
int read = fis.read(buf);
@@ -231,8 +238,6 @@ class BlobServerConnection extends Thread {
outputStream.write(buf, 0, read);
bytesRemaining -= read;
}
- } finally {
- fis.close();
}
}
catch (SocketException e) {
@@ -314,6 +319,9 @@ class BlobServerConnection extends Thread {
File storageFile = this.blobServer.getStorageLocation(jobID, key);
Files.move(incomingFile, storageFile);
incomingFile = null;
+
+ blobServer.getBlobStore().put(storageFile, jobID, key);
+
outputStream.write(RETURN_OKAY);
}
else {
@@ -322,6 +330,8 @@ class BlobServerConnection extends Thread {
Files.move(incomingFile, storageFile);
incomingFile = null;
+ blobServer.getBlobStore().put(storageFile, blobKey);
+
// Return computed key to client for validation
outputStream.write(RETURN_OKAY);
blobKey.writeToOutputStream(outputStream);
@@ -379,6 +389,8 @@ class BlobServerConnection extends Thread {
if (blobFile.exists() && !blobFile.delete()) {
throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
}
+
+ blobServer.getBlobStore().delete(key);
}
else if (type == NAME_ADDRESSABLE) {
byte[] jidBytes = new byte[JobID.SIZE];
@@ -391,6 +403,8 @@ class BlobServerConnection extends Thread {
if (blobFile.exists() && !blobFile.delete()) {
throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
}
+
+ blobServer.getBlobStore().delete(jobID, key);
}
else if (type == JOB_ID_SCOPE) {
byte[] jidBytes = new byte[JobID.SIZE];
@@ -398,6 +412,8 @@ class BlobServerConnection extends Thread {
JobID jobID = JobID.fromByteArray(jidBytes);
blobServer.deleteJobDirectory(jobID);
+
+ blobServer.getBlobStore().deleteAll(jobID);
}
else {
throw new IOException("Unrecognized addressing type: " + type);
http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
new file mode 100644
index 0000000..1e72d91
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.File;
+
+/**
+ * A blob store.
+ */
+interface BlobStore {
+
+ /**
+ * Copies the local file to the blob store.
+ *
+ * @param localFile The file to copy
+ * @param blobKey The ID for the file in the blob store
+ * @throws Exception If the copy fails
+ */
+ void put(File localFile, BlobKey blobKey) throws Exception;
+
+ /**
+ * Copies a local file to the blob store.
+ *
+ * <p>The job ID and key make up a composite key for the file.
+ *
+ * @param localFile The file to copy
+ * @param jobId The JobID part of ID for the file in the blob store
+ * @param key The String part of ID for the file in the blob store
+ * @throws Exception If the copy fails
+ */
+ void put(File localFile, JobID jobId, String key) throws Exception;
+
+ /**
+ * Copies a blob to a local file.
+ *
+ * @param blobKey The blob ID
+ * @param localFile The local file to copy to
+ * @throws Exception If the copy fails
+ */
+ void get(BlobKey blobKey, File localFile) throws Exception;
+
+ /**
+ * Copies a blob to a local file.
+ *
+ * @param jobId The JobID part of ID for the blob
+ * @param key The String part of ID for the blob
+ * @param localFile The local file to copy to
+ * @throws Exception If the copy fails
+ */
+ void get(JobID jobId, String key, File localFile) throws Exception;
+
+ /**
+ * Deletes a blob.
+ *
+ * @param blobKey The blob ID
+ */
+ void delete(BlobKey blobKey);
+
+ /**
+ * Deletes a blob.
+ *
+ * @param jobId The JobID part of ID for the blob
+ * @param key The String part of ID for the blob
+ */
+ void delete(JobID jobId, String key);
+
+ /**
+ * Deletes blobs.
+ *
+ * @param jobId The JobID part of all blobs to delete
+ */
+ void deleteAll(JobID jobId);
+
+ /**
+ * Cleans up the store and deletes all blobs.
+ */
+ void cleanUp();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index c47ecf2..d8f744b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -21,14 +21,19 @@ package org.apache.flink.runtime.blob;
import com.google.common.io.BaseEncoding;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.util.IOUtils;
import org.slf4j.Logger;
import java.io.EOFException;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
+import java.net.URI;
import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
@@ -49,12 +54,12 @@ public class BlobUtils {
/**
* The prefix of all BLOB files stored by the BLOB server.
*/
- private static final String BLOB_FILE_PREFIX = "blob_";
+ static final String BLOB_FILE_PREFIX = "blob_";
/**
* The prefix of all job-specific directories created by the BLOB server.
*/
- private static final String JOB_DIR_PREFIX = "job_";
+ static final String JOB_DIR_PREFIX = "job_";
/**
* The default character set to translate between characters and bytes.
@@ -103,7 +108,7 @@ public class BlobUtils {
static File getIncomingDirectory(File storageDir) {
final File incomingDir = new File(storageDir, "incoming");
- if (!incomingDir.exists() && !incomingDir.mkdir()) {
+ if (!incomingDir.exists() && !incomingDir.mkdirs()) {
throw new RuntimeException("Cannot create directory for incoming files " + incomingDir.getAbsolutePath());
}
@@ -119,7 +124,7 @@ public class BlobUtils {
private static File getCacheDirectory(File storageDir) {
final File cacheDirectory = new File(storageDir, "cache");
- if (!cacheDirectory.exists() && !cacheDirectory.mkdir()) {
+ if (!cacheDirectory.exists() && !cacheDirectory.mkdirs()) {
throw new RuntimeException("Could not create cache directory '" + cacheDirectory.getAbsolutePath() + "'.");
}
@@ -174,7 +179,7 @@ public class BlobUtils {
* the user's key for a BLOB
* @return the internal name for the BLOB as used by the BLOB server
*/
- private static String encodeKey(String key) {
+ static String encodeKey(String key) {
return BaseEncoding.base64().encode(key.getBytes(DEFAULT_CHARSET));
}
@@ -327,6 +332,66 @@ public class BlobUtils {
}
/**
+ * Returns the path for the given blob key.
+ *
+ * <p>The returned path can be used with the state backend for recovery purposes.
+ *
+ * <p>This follows the same scheme as {@link #getStorageLocation(File, BlobKey)}.
+ */
+ static String getRecoveryPath(String basePath, BlobKey blobKey) {
+ // format: $base/cache/blob_$key
+ return String.format("%s/cache/%s", basePath, BLOB_FILE_PREFIX + blobKey.toString());
+ }
+
+ /**
+ * Returns the path for the given job ID and key.
+ *
+ * <p>The returned path can be used with the state backend for recovery purposes.
+ *
+ * <p>This follows the same scheme as {@link #getStorageLocation(File, JobID, String)}.
+ */
+ static String getRecoveryPath(String basePath, JobID jobId, String key) {
+ // format: $base/job_$id/blob_$key
+ return String.format("%s/%s/%s", basePath, JOB_DIR_PREFIX + jobId.toString(),
+ BLOB_FILE_PREFIX + encodeKey(key));
+ }
+
+ /**
+ * Returns the path for the given job ID.
+ *
+ * <p>The returned path can be used with the state backend for recovery purposes.
+ */
+ static String getRecoveryPath(String basePath, JobID jobId) {
+ return String.format("%s/%s", basePath, JOB_DIR_PREFIX + jobId.toString());
+ }
+
+ /**
+ * Copies the file from the recovery path to the local file.
+ */
+ static void copyFromRecoveryPath(String recoveryPath, File localBlobFile) throws Exception {
+ if (recoveryPath == null) {
+ throw new IllegalStateException("Failed to determine recovery path.");
+ }
+
+ if (!localBlobFile.createNewFile()) {
+ throw new IllegalStateException("Failed to create new local file to copy to");
+ }
+
+ URI uri = new URI(recoveryPath);
+ Path path = new Path(recoveryPath);
+
+ if (FileSystem.get(uri).exists(path)) {
+ try (InputStream is = FileSystem.get(uri).open(path)) {
+ FileOutputStream fos = new FileOutputStream(localBlobFile);
+ IOUtils.copyBytes(is, fos); // closes the streams
+ }
+ }
+ else {
+ throw new IOException("Cannot find required BLOB at '" + recoveryPath + "' for recovery.");
+ }
+ }
+
+ /**
* Private constructor to prevent instantiation.
*/
private BlobUtils() {
http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
new file mode 100644
index 0000000..8a037ad
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import com.google.common.io.Files;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Blob store backed by {@link FileSystem}.
+ */
+class FileSystemBlobStore implements BlobStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileSystemBlobStore.class);
+
+ /** The base path of the blob store */
+ private final String basePath;
+
+ FileSystemBlobStore(Configuration config) throws IOException {
+ StateBackend stateBackend = StateBackend.fromConfig(config);
+
+ if (stateBackend == StateBackend.FILESYSTEM) {
+ String stateBackendBasePath = config.getString(
+ ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
+
+ if (stateBackendBasePath.equals("")) {
+ throw new IllegalConfigurationException(String.format("Missing configuration for " +
+ "file system state backend recovery path. Please specify via " +
+ "'%s' key.", ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
+ }
+
+ stateBackendBasePath += "/blob";
+
+ this.basePath = stateBackendBasePath;
+
+ try {
+ FileSystem.get(new URI(basePath)).mkdirs(new Path(basePath));
+ }
+ catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+
+ LOG.info("Created blob directory {}.", basePath);
+ }
+ else {
+ // Nothing else support at the moment
+ throw new IllegalConfigurationException(
+ String.format("Illegal state backend " +
+ "configuration '%s'. Please configure '%s' as state " +
+ "backend and specify the recovery path via '%s' key.",
+ stateBackend, StateBackend.FILESYSTEM,
+ ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
+ }
+ }
+
+ // - Put ------------------------------------------------------------------
+
+ @Override
+ public void put(File localFile, BlobKey blobKey) throws Exception {
+ put(localFile, BlobUtils.getRecoveryPath(basePath, blobKey));
+ }
+
+ @Override
+ public void put(File localFile, JobID jobId, String key) throws Exception {
+ put(localFile, BlobUtils.getRecoveryPath(basePath, jobId, key));
+ }
+
+ private void put(File fromFile, String toBlobPath) throws Exception {
+ try (OutputStream os = FileSystem.get(new URI(toBlobPath))
+ .create(new Path(toBlobPath), true)) {
+
+ LOG.debug("Copying from {} to {}.", fromFile, toBlobPath);
+ Files.copy(fromFile, os);
+ }
+ }
+
+ // - Get ------------------------------------------------------------------
+
+ @Override
+ public void get(BlobKey blobKey, File localFile) throws Exception {
+ get(BlobUtils.getRecoveryPath(basePath, blobKey), localFile);
+ }
+
+ @Override
+ public void get(JobID jobId, String key, File localFile) throws Exception {
+ get(BlobUtils.getRecoveryPath(basePath, jobId, key), localFile);
+ }
+
+ private void get(String fromBlobPath, File toFile) throws Exception {
+ checkNotNull(fromBlobPath, "Blob path");
+ checkNotNull(toFile, "File");
+
+ if (!toFile.exists() && !toFile.createNewFile()) {
+ throw new IllegalStateException("Failed to create target file to copy to");
+ }
+
+ final URI fromUri = new URI(fromBlobPath);
+ final Path fromPath = new Path(fromBlobPath);
+
+ if (FileSystem.get(fromUri).exists(fromPath)) {
+ try (InputStream is = FileSystem.get(fromUri).open(fromPath)) {
+ FileOutputStream fos = new FileOutputStream(toFile);
+
+ LOG.debug("Copying from {} to {}.", fromBlobPath, toFile);
+ IOUtils.copyBytes(is, fos); // closes the streams
+ }
+ }
+ else {
+ throw new IOException(fromBlobPath + " does not exist.");
+ }
+ }
+
+ // - Delete ---------------------------------------------------------------
+
+ @Override
+ public void delete(BlobKey blobKey) {
+ delete(BlobUtils.getRecoveryPath(basePath, blobKey));
+ }
+
+ @Override
+ public void delete(JobID jobId, String key) {
+ delete(BlobUtils.getRecoveryPath(basePath, jobId, key));
+ }
+
+ @Override
+ public void deleteAll(JobID jobId) {
+ delete(BlobUtils.getRecoveryPath(basePath, jobId));
+ }
+
+ private void delete(String blobPath) {
+ try {
+ LOG.debug("Deleting {}.", blobPath);
+
+ FileSystem.get(new URI(blobPath)).delete(new Path(blobPath), true);
+ }
+ catch (Exception e) {
+ LOG.warn("Failed to delete blob at " + blobPath);
+ }
+ }
+
+ @Override
+ public void cleanUp() {
+ try {
+ LOG.debug("Cleaning up {}.", basePath);
+
+ FileSystem.get(new URI(basePath)).delete(new Path(basePath), true);
+ }
+ catch (Exception e) {
+ LOG.error("Failed to clean up recovery directory.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
new file mode 100644
index 0000000..1b71add
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.File;
+
+/**
+ * A blob store doing nothing.
+ */
+class VoidBlobStore implements BlobStore {
+
+ @Override
+ public void put(File localFile, BlobKey blobKey) throws Exception {
+ }
+
+ @Override
+ public void put(File localFile, JobID jobId, String key) throws Exception {
+ }
+
+ @Override
+ public void get(BlobKey blobKey, File localFile) throws Exception {
+ }
+
+ @Override
+ public void get(JobID jobId, String key, File localFile) throws Exception {
+ }
+
+ @Override
+ public void delete(BlobKey blobKey) {
+ }
+
+ @Override
+ public void delete(JobID jobId, String key) {
+ }
+
+ @Override
+ public void deleteAll(JobID jobId) {
+ }
+
+ @Override
+ public void cleanUp() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
index 17322d8..077e34d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
@@ -24,11 +24,11 @@ import org.apache.flink.configuration.Configuration;
/**
* Recovery mode for Flink's cluster execution. Currently supported modes are:
*
- * - Standalone: No recovery from JobManager failures
- * - ZooKeeper: JobManager high availability via ZooKeeper
- * ZooKeeper is used to select a leader among a group of JobManager. This JobManager
- * is responsible for the job execution. Upon failure of the leader a new leader is elected
- * which will take over the responsibilities of the old leader
+ * - Standalone: No recovery from JobManager failures
+ * - ZooKeeper: JobManager high availability via ZooKeeper
+ * ZooKeeper is used to select a leader among a group of JobManager. This JobManager
+ * is responsible for the job execution. Upon failure of the leader a new leader is elected
+ * which will take over the responsibilities of the old leader
*/
public enum RecoveryMode {
STANDALONE,
@@ -69,4 +69,4 @@ public enum RecoveryMode {
return false;
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
new file mode 100644
index 0000000..0e324a8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class BlobRecoveryITCase {
+
+ private File recoveryDir;
+
+ @Before
+ public void setUp() throws Exception {
+ recoveryDir = new File(FileUtils.getTempDirectory(), "BlobRecoveryITCaseDir");
+ if (!recoveryDir.exists() && !recoveryDir.mkdirs()) {
+ throw new IllegalStateException("Failed to create temp directory for test");
+ }
+ }
+
+ @After
+ public void cleanUp() throws Exception {
+ if (recoveryDir != null) {
+ FileUtils.deleteDirectory(recoveryDir);
+ }
+ }
+
+ /**
+ * Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are recoverable from any
+ * participating BlobServer.
+ */
+ @Test
+ public void testBlobServerRecovery() throws Exception {
+ Random rand = new Random();
+
+ BlobServer[] server = new BlobServer[2];
+ InetSocketAddress[] serverAddress = new InetSocketAddress[2];
+ BlobClient client = null;
+
+ try {
+ Configuration config = new Configuration();
+ config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
+ config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+ config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, recoveryDir.getPath());
+
+ for (int i = 0; i < server.length; i++) {
+ server[i] = new BlobServer(config);
+ serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort());
+ }
+
+ client = new BlobClient(serverAddress[0]);
+
+ // Random data
+ byte[] expected = new byte[1024];
+ rand.nextBytes(expected);
+
+ BlobKey[] keys = new BlobKey[2];
+
+ // Put data
+ keys[0] = client.put(expected); // Request 1
+ keys[1] = client.put(expected, 32, 256); // Request 2
+
+ JobID[] jobId = new JobID[] { new JobID(), new JobID() };
+ String[] testKey = new String[] { "test-key-1", "test-key-2" };
+
+ client.put(jobId[0], testKey[0], expected); // Request 3
+ client.put(jobId[1], testKey[1], expected, 32, 256); // Request 4
+
+ // Close the client and connect to the other server
+ client.close();
+ client = new BlobClient(serverAddress[1]);
+
+ // Verify request 1
+ try (InputStream is = client.get(keys[0])) {
+ byte[] actual = new byte[expected.length];
+
+ BlobUtils.readFully(is, actual, 0, expected.length, null);
+
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(expected[i], actual[i]);
+ }
+ }
+
+ // Verify request 2
+ try (InputStream is = client.get(keys[1])) {
+ byte[] actual = new byte[256];
+ BlobUtils.readFully(is, actual, 0, 256, null);
+
+ for (int i = 32, j = 0; i < 256; i++, j++) {
+ assertEquals(expected[i], actual[j]);
+ }
+ }
+
+ // Verify request 3
+ try (InputStream is = client.get(jobId[0], testKey[0])) {
+ byte[] actual = new byte[expected.length];
+ BlobUtils.readFully(is, actual, 0, expected.length, null);
+
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(expected[i], actual[i]);
+ }
+ }
+
+ // Verify request 4
+ try (InputStream is = client.get(jobId[1], testKey[1])) {
+ byte[] actual = new byte[256];
+ BlobUtils.readFully(is, actual, 0, 256, null);
+
+ for (int i = 32, j = 0; i < 256; i++, j++) {
+ assertEquals(expected[i], actual[j]);
+ }
+ }
+ }
+ finally {
+ for (BlobServer s : server) {
+ if (s != null) {
+ s.shutdown();
+ }
+ }
+
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ // Verify everything is clean
+ File[] recoveryFiles = recoveryDir.listFiles();
+ assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
new file mode 100644
index 0000000..4df8afb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution.librarycache;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobCache;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class BlobLibraryCacheRecoveryITCase {
+
+ private File recoveryDir;
+
+ @Before
+ public void setUp() throws Exception {
+ recoveryDir = new File(FileUtils.getTempDirectory(), "BlobRecoveryITCaseDir");
+ if (!recoveryDir.exists() && !recoveryDir.mkdirs()) {
+ throw new IllegalStateException("Failed to create temp directory for test");
+ }
+ }
+
+ @After
+ public void cleanUp() throws Exception {
+ if (recoveryDir != null) {
+ FileUtils.deleteDirectory(recoveryDir);
+ }
+ }
+
+ /**
+ * Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are recoverable from any
+ * participating BlobLibraryCacheManager.
+ */
+ @Test
+ public void testRecoveryRegisterAndDownload() throws Exception {
+ Random rand = new Random();
+
+ BlobServer[] server = new BlobServer[2];
+ InetSocketAddress[] serverAddress = new InetSocketAddress[2];
+ BlobLibraryCacheManager[] libServer = new BlobLibraryCacheManager[2];
+ BlobCache cache = null;
+ BlobLibraryCacheManager libCache = null;
+
+ try {
+ Configuration config = new Configuration();
+ config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
+ config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+ config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, recoveryDir.getPath());
+
+ for (int i = 0; i < server.length; i++) {
+ server[i] = new BlobServer(config);
+ serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort());
+ libServer[i] = new BlobLibraryCacheManager(server[i], 3600 * 1000);
+ }
+
+ // Random data
+ byte[] expected = new byte[1024];
+ rand.nextBytes(expected);
+
+ List<BlobKey> keys = new ArrayList<>(2);
+
+ // Upload some data (libraries)
+ try (BlobClient client = new BlobClient(serverAddress[0])) {
+ keys.add(client.put(expected)); // Request 1
+ keys.add(client.put(expected, 32, 256)); // Request 2
+ }
+
+ // The cache
+ cache = new BlobCache(serverAddress[0], config);
+ libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);
+
+ // Register uploaded libraries
+ JobID jobId = new JobID();
+ ExecutionAttemptID executionId = new ExecutionAttemptID();
+ libServer[0].registerTask(jobId, executionId, keys, Collections.<URL>emptyList());
+
+ // Verify key 1
+ File f = libCache.getFile(keys.get(0));
+ assertEquals(expected.length, f.length());
+
+ try (FileInputStream fis = new FileInputStream(f)) {
+ for (int i = 0; i < expected.length && fis.available() > 0; i++) {
+ assertEquals(expected[i], (byte) fis.read());
+ }
+
+ assertEquals(0, fis.available());
+ }
+
+ // Shutdown cache and start with other server
+ cache.shutdown();
+ libCache.shutdown();
+
+ cache = new BlobCache(serverAddress[1], config);
+ libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);
+
+ // Verify key 1
+ f = libCache.getFile(keys.get(0));
+ assertEquals(expected.length, f.length());
+
+ try (FileInputStream fis = new FileInputStream(f)) {
+ for (int i = 0; i < expected.length && fis.available() > 0; i++) {
+ assertEquals(expected[i], (byte) fis.read());
+ }
+
+ assertEquals(0, fis.available());
+ }
+
+ // Verify key 2
+ f = libCache.getFile(keys.get(1));
+ assertEquals(256, f.length());
+
+ try (FileInputStream fis = new FileInputStream(f)) {
+ for (int i = 0; i < 256 && fis.available() > 0; i++) {
+ assertEquals(expected[32 + i], (byte) fis.read());
+ }
+
+ assertEquals(0, fis.available());
+ }
+ }
+ finally {
+ for (BlobServer s : server) {
+ if (s != null) {
+ s.shutdown();
+ }
+ }
+
+ if (cache != null) {
+ cache.shutdown();
+ }
+
+ if (libCache != null) {
+ libCache.shutdown();
+ }
+ }
+
+ // Verify everything is clean
+ File[] recoveryFiles = recoveryDir.listFiles();
+ assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
+ }
+}