You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/21 09:40:30 UTC

flink git commit: [FLINK-7196][blob] add a TTL to all transient BLOBs

Repository: flink
Updated Branches:
  refs/heads/master e7a2f57ee -> a1d483179


[FLINK-7196][blob] add a TTL to all transient BLOBs

Transient BLOB files should not exist for long and are only deleted manually
from the caches and after access on the server. This uses the BLOB storage's
cleanup interval to set a TTL on all transient BLOB files as a backup cleanup
path. The cleanup task itself runs every cleanupInterval seconds and removes all
transient BLOBs for which the TTL is older than the current time. This way, a
transient BLOB stays at most 2*cleanupInterval seconds before getting deleted
automatically.

This closes #4381.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1d48317
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1d48317
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1d48317

Branch: refs/heads/master
Commit: a1d48317964df584c66d2cc9d7f539d31fa3d713
Parents: e7a2f57
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Oct 5 15:18:58 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Oct 21 11:40:16 2017 +0200

----------------------------------------------------------------------
 docs/ops/config.md                              |  12 +-
 .../apache/flink/runtime/blob/BlobServer.java   |  67 ++++-
 .../flink/runtime/blob/PermanentBlobCache.java  |  10 +-
 .../flink/runtime/blob/TransientBlobCache.java  |  63 ++++-
 .../runtime/blob/TransientBlobCleanupTask.java  | 115 ++++++++
 .../runtime/blob/BlobCacheCleanupTest.java      | 220 ++++++++++------
 .../flink/runtime/blob/BlobCachePutTest.java    |  14 +-
 .../runtime/blob/BlobServerCleanupTest.java     | 264 +++++++++++++++++++
 .../runtime/blob/BlobServerDeleteTest.java      |   2 +-
 .../BlobLibraryCacheManagerTest.java            |   4 +-
 10 files changed, 670 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a1d48317/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 64ef48f..af14d5f 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -199,12 +199,14 @@ will be used under the directory specified by jobmanager.web.tmpdir.
 
 - `blob.storage.directory`: Directory for storing blobs (such as user JARs) on the TaskManagers.
 
-- `blob.service.cleanup.interval`: Cleanup interval (in seconds) of the blob caches (DEFAULT: 1 hour).
-Whenever a job is not referenced at the cache anymore, we set a TTL and let the periodic cleanup task
-(executed every `blob.service.cleanup.interval` seconds) remove its blob files after this TTL has passed.
+- `blob.service.cleanup.interval`: Cleanup interval (in seconds) of transient blobs at server and caches as well as permanent blobs at the caches (DEFAULT: 1 hour).
+Whenever a job is not referenced at the cache anymore, we set a TTL for its permanent blob files and
+let the periodic cleanup task (executed every `blob.service.cleanup.interval` seconds) remove them
+after this TTL has passed. We do the same for transient blob files at both server and caches but
+immediately after accessing them, i.e. an put or get operation.
 This means that a blob will be retained at most <tt>2 * `blob.service.cleanup.interval`</tt> seconds after
-not being referenced anymore. Therefore, a recovery still has the chance to use existing files rather
-than to download them again.
+not being referenced anymore (permanent blobs) or their last access (transient blobs). For permanent blobs,
+this means that a recovery still has the chance to use existing files rather downloading them again.
 
 - `blob.server.port`: Port definition for the blob server (serving user JARs) on the TaskManagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a1d48317/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index bc61ef7..402a739 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.net.SSLUtils;
@@ -46,14 +47,17 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.Timer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -102,6 +106,23 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 */
 	private final Thread shutdownHook;
 
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Map to store the TTL of each element stored in the local storage, i.e. via one of the {@link
+	 * #getFile} methods.
+	 **/
+	private final ConcurrentHashMap<Tuple2<JobID, TransientBlobKey>, Long> blobExpiryTimes =
+		new ConcurrentHashMap<>();
+
+	/** Time interval (ms) to run the cleanup task; also used as the default TTL. */
+	private final long cleanupInterval;
+
+	/**
+	 * Timer task to execute the cleanup at regular intervals.
+	 */
+	private final Timer cleanupTimer;
+
 	/**
 	 * Instantiates a new BLOB server and binds it to a free network port.
 	 *
@@ -141,6 +162,14 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 			backlog = BlobServerOptions.FETCH_BACKLOG.defaultValue();
 		}
 
+		// Initializing the clean up task
+		this.cleanupTimer = new Timer(true);
+
+		this.cleanupInterval = config.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
+		this.cleanupTimer
+			.schedule(new TransientBlobCleanupTask(blobExpiryTimes, readWriteLock.writeLock(),
+				storageDir, LOG), cleanupInterval, cleanupInterval);
+
 		this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
 
 		if (config.getBoolean(BlobServerOptions.SSL_ENABLED)) {
@@ -273,6 +302,8 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 */
 	@Override
 	public void close() throws IOException {
+		cleanupTimer.cancel();
+
 		if (shutdownRequested.compareAndSet(false, true)) {
 			Exception exception = null;
 
@@ -462,6 +493,15 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 		// assume readWriteLock.readLock() was already locked (cannot really check that)
 
 		if (localFile.exists()) {
+			// update TTL for transient BLOBs:
+			if (blobKey instanceof TransientBlobKey) {
+				// regarding concurrent operations, it is not really important which timestamp makes
+				// it into the map as they are close to each other anyway, also we can simply
+				// overwrite old values as long as we are in the read (or write) lock
+				blobExpiryTimes
+					.put(Tuple2.of(jobId, (TransientBlobKey) blobKey),
+						System.currentTimeMillis() + cleanupInterval);
+			}
 			return;
 		} else if (blobKey instanceof PermanentBlobKey) {
 			// Try the HA blob store
@@ -695,6 +735,13 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 					BlobUtils.moveTempFileToStore(
 						incomingFile, jobId, blobKey, storageFile, LOG,
 						blobKey instanceof PermanentBlobKey ? blobStore : null);
+					// add TTL for transient BLOBs:
+					if (blobKey instanceof TransientBlobKey) {
+						// must be inside read or write lock to add a TTL
+						blobExpiryTimes
+							.put(Tuple2.of(jobId, (TransientBlobKey) blobKey),
+								System.currentTimeMillis() + cleanupInterval);
+					}
 					return blobKey;
 				}
 			} finally {
@@ -769,6 +816,8 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 				LOG.warn("Failed to locally delete BLOB " + key + " at " + localFile.getAbsolutePath());
 				return false;
 			}
+			// this needs to happen inside the write lock in case of concurrent getFile() calls
+			blobExpiryTimes.remove(Tuple2.of(jobId, key));
 			return true;
 		} finally {
 			readWriteLock.writeLock().unlock();
@@ -797,6 +846,12 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 			boolean deletedLocally = false;
 			try {
 				FileUtils.deleteDirectory(jobDir);
+
+				// NOTE: Instead of going through blobExpiryTimes, keep lingering entries - they
+				//       will be cleaned up by the timer task which tolerates non-existing files
+				//       If inserted again with the same IDs (via put()), the TTL will be updated
+				//       again.
+
 				deletedLocally = true;
 			} catch (IOException e) {
 				LOG.warn("Failed to locally delete BLOB storage directory at " +
@@ -833,6 +888,16 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	}
 
 	/**
+	 * Returns the blob expiry times - for testing purposes only!
+	 *
+	 * @return blob expiry times (internal state!)
+	 */
+	@VisibleForTesting
+	ConcurrentMap<Tuple2<JobID, TransientBlobKey>, Long> getBlobExpiryTimes() {
+		return blobExpiryTimes;
+	}
+
+	/**
 	 * Tests whether the BLOB server has been requested to shut down.
 	 *
 	 * @return True, if the server has been requested to shut down, false otherwise.

http://git-wip-us.apache.org/repos/asf/flink/blob/a1d48317/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
index 63003a2..ae59f59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
@@ -40,9 +40,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Provides a cache for permanent BLOB files including a per-job ref-counting and a staged cleanup.
  *
- * <p>When requesting BLOBs via {@link #getPermanentFile(JobID, BlobKey)}, the cache will first attempt to
- * serve the file from its local cache. Only if the local cache does not contain the desired BLOB,
- * it will try to download it from a distributed HA file system (if available) or the BLOB server.
+ * <p>When requesting BLOBs via {@link #getFile(JobID, PermanentBlobKey)}, the cache will first
+ * attempt to serve the file from its local cache. Only if the local cache does not contain the
+ * desired BLOB, it will try to download it from a distributed HA file system (if available) or the
+ * BLOB server.
  *
  * <p>If files for a job are not needed any more, they will enter a staged, i.e. deferred, cleanup.
  * Files may thus still be be accessible upon recovery and do not need to be re-downloaded.
@@ -76,6 +77,9 @@ public class PermanentBlobCache extends AbstractBlobCache implements PermanentBl
 	 */
 	private final long cleanupInterval;
 
+	/**
+	 * Timer task to execute the cleanup at regular intervals.
+	 */
 	private final Timer cleanupTimer;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a1d48317/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java
index 941973c..bbd008a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 
 import org.slf4j.LoggerFactory;
@@ -30,6 +32,9 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.util.Timer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -43,6 +48,23 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class TransientBlobCache extends AbstractBlobCache implements TransientBlobService {
 
 	/**
+	 * Map to store the TTL of each element stored in the local storage, i.e. via one of the {@link
+	 * #getFile} methods.
+	 **/
+	private final ConcurrentHashMap<Tuple2<JobID, TransientBlobKey>, Long> blobExpiryTimes =
+		new ConcurrentHashMap<>();
+
+	/**
+	 * Time interval (ms) to run the cleanup task; also used as the default TTL.
+	 */
+	private final long cleanupInterval;
+
+	/**
+	 * Timer task to execute the cleanup at regular intervals.
+	 */
+	private final Timer cleanupTimer;
+
+	/**
 	 * Instantiates a new BLOB cache.
 	 *
 	 * @param serverAddress
@@ -59,6 +81,14 @@ public class TransientBlobCache extends AbstractBlobCache implements TransientBl
 
 		super(serverAddress, blobClientConfig, new VoidBlobStore(),
 			LoggerFactory.getLogger(TransientBlobCache.class));
+
+		// Initializing the clean up task
+		this.cleanupTimer = new Timer(true);
+
+		this.cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
+		this.cleanupTimer
+			.schedule(new TransientBlobCleanupTask(blobExpiryTimes, readWriteLock.writeLock(),
+				storageDir, log), cleanupInterval, cleanupInterval);
 	}
 
 	@Override
@@ -73,6 +103,24 @@ public class TransientBlobCache extends AbstractBlobCache implements TransientBl
 	}
 
 	@Override
+	protected File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOException {
+		File file = super.getFileInternal(jobId, blobKey);
+
+		readWriteLock.readLock().lock();
+		try {
+			// regarding concurrent operations, it is not really important which timestamp makes
+			// it into the map as they are close to each other anyway, also we can simply
+			// overwrite old values as long as we are in the read (or write) lock
+			blobExpiryTimes.put(Tuple2.of(jobId, (TransientBlobKey) blobKey),
+				System.currentTimeMillis() + cleanupInterval);
+		} finally {
+			readWriteLock.readLock().unlock();
+		}
+
+		return file;
+	}
+
+	@Override
 	public TransientBlobKey putTransient(byte[] value) throws IOException {
 		try (BlobClient bc = new BlobClient(serverAddress, blobClientConfig)) {
 			return (TransientBlobKey) bc.putBuffer(null, value, 0, value.length, TRANSIENT_BLOB);
@@ -134,6 +182,9 @@ public class TransientBlobCache extends AbstractBlobCache implements TransientBl
 				log.warn("Failed to delete locally cached BLOB {} at {}", key,
 					localFile.getAbsolutePath());
 				return false;
+			} else {
+				// this needs to happen inside the write lock in case of concurrent getFile() calls
+				blobExpiryTimes.remove(Tuple2.of(jobId, key));
 			}
 		} finally {
 			readWriteLock.writeLock().unlock();
@@ -142,6 +193,16 @@ public class TransientBlobCache extends AbstractBlobCache implements TransientBl
 	}
 
 	/**
+	 * Returns the blob expiry times - for testing purposes only!
+	 *
+	 * @return blob expiry times (internal state!)
+	 */
+	@VisibleForTesting
+	ConcurrentMap<Tuple2<JobID, TransientBlobKey>, Long> getBlobExpiryTimes() {
+		return blobExpiryTimes;
+	}
+
+	/**
 	 * Returns a file handle to the file associated with the given blob key on the blob
 	 * server.
 	 *
@@ -166,6 +227,6 @@ public class TransientBlobCache extends AbstractBlobCache implements TransientBl
 
 	@Override
 	protected void cancelCleanupTask() {
-		// nothing to do here
+		cleanupTimer.cancel();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1d48317/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCleanupTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCleanupTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCleanupTask.java
new file mode 100644
index 0000000..5dc7678
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCleanupTask.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Cleanup task for transient BLOBs.
+ */
+class TransientBlobCleanupTask extends TimerTask {
+
+	/**
+	 * The log object used for debugging.
+	 */
+	private final Logger log;
+
+	/**
+	 * Map to store the TTL of each element stored in the local storage.
+	 **/
+	private ConcurrentMap<Tuple2<JobID, TransientBlobKey>, Long> blobExpiryTimes;
+
+	/**
+	 * Lock to acquire before changing file contents.
+	 */
+	private Lock writeLock;
+
+	/**
+	 * Local storage directory to work on.
+	 */
+	private File storageDir;
+
+	/**
+	 * Creates a new cleanup timer task working with the given parameters from {@link BlobServer}
+	 * and {@link TransientBlobCache}.
+	 *
+	 * @param blobExpiryTimes
+	 * 		map to store the TTL of each element stored in the local storage
+	 * @param writeLock
+	 * 		lock to acquire before changing file contents
+	 * @param storageDir
+	 * 		local storage directory to work on
+	 * @param log
+	 * 		logger instance for debugging
+	 */
+	TransientBlobCleanupTask(
+			ConcurrentMap<Tuple2<JobID, TransientBlobKey>, Long> blobExpiryTimes, Lock writeLock,
+			File storageDir, Logger log) {
+		this.blobExpiryTimes = checkNotNull(blobExpiryTimes);
+		this.writeLock = checkNotNull(writeLock);
+		this.storageDir = checkNotNull(storageDir);
+		this.log = checkNotNull(log);
+	}
+
+	/**
+	 * Cleans up transient BLOBs whose TTL is up, tolerating that files do not exist (anymore).
+	 */
+	@Override
+	public void run() {
+		// let's cache the current time - we do not operate on a millisecond precision anyway
+		final long currentTimeMillis = System.currentTimeMillis();
+		// iterate through all entries and remove those where the current time is past their expiry
+		Set<Map.Entry<Tuple2<JobID, TransientBlobKey>, Long>> entries = blobExpiryTimes.entrySet();
+		for (Map.Entry<Tuple2<JobID, TransientBlobKey>, Long> entry : entries) {
+			if (currentTimeMillis >= entry.getValue()) {
+				JobID jobId = entry.getKey().f0;
+				BlobKey blobKey = entry.getKey().f1;
+
+				final File localFile =
+					new File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId, blobKey));
+
+				// deleting the file or changing blobExpiryTimes' contents needs to be protected by the lock
+				writeLock.lock();
+
+				try {
+					if (!localFile.delete() && localFile.exists()) {
+						log.warn("Failed to locally delete blob " + localFile.getAbsolutePath());
+					} else {
+						// this needs to happen inside the write lock in case of concurrent getFile() calls
+						entries.remove(entry);
+					}
+				} finally {
+					writeLock.unlock();
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1d48317/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
index d6fab50..5ec9b89 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
@@ -19,8 +19,11 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Ignore;
@@ -28,23 +31,41 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.runtime.blob.BlobCachePutTest.verifyDeletedEventually;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobServerCleanupTest.checkFileCountForJob;
+import static org.apache.flink.runtime.blob.BlobServerCleanupTest.checkFilesExist;
+import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
+import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
+import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 /**
- * A few tests for the deferred ref-counting based cleanup inside the {@link PermanentBlobCache}.
+ * A few tests for the cleanup of {@link PermanentBlobCache} and {@link TransientBlobCache}.
  */
 public class BlobCacheCleanupTest extends TestLogger {
 
+	private final Random rnd = new Random();
+
 	@Rule
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
@@ -52,7 +73,7 @@ public class BlobCacheCleanupTest extends TestLogger {
 	 * Tests that {@link PermanentBlobCache} cleans up after calling {@link PermanentBlobCache#releaseJob(JobID)}.
 	 */
 	@Test
-	public void testJobCleanup() throws IOException, InterruptedException {
+	public void testPermanentBlobCleanup() throws IOException, InterruptedException {
 
 		JobID jobId = new JobID();
 		List<PermanentBlobKey> keys = new ArrayList<>();
@@ -131,7 +152,7 @@ public class BlobCacheCleanupTest extends TestLogger {
 	 * when registering, releasing, and re-registering jobs.
 	 */
 	@Test
-	public void testJobReferences() throws IOException, InterruptedException {
+	public void testPermanentJobReferences() throws IOException, InterruptedException {
 
 		JobID jobId = new JobID();
 
@@ -184,12 +205,13 @@ public class BlobCacheCleanupTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that {@link PermanentBlobCache} cleans up after calling {@link PermanentBlobCache#releaseJob(JobID)}
-	 * but only after preserving the file for a bit longer.
+	 * Tests the deferred cleanup of {@link PermanentBlobCache}, i.e. after calling {@link
+	 * PermanentBlobCache#releaseJob(JobID)} the file should be preserved a bit longer and then
+	 * cleaned up.
 	 */
 	@Test
 	@Ignore("manual test due to stalling: ensures a BLOB is retained first and only deleted after the (long) timeout ")
-	public void testJobDeferredCleanup() throws IOException, InterruptedException {
+	public void testPermanentBlobDeferredCleanup() throws IOException, InterruptedException {
 		// file should be deleted between 5 and 10s after last job release
 		long cleanupInterval = 5L;
 
@@ -278,6 +300,114 @@ public class BlobCacheCleanupTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testTransientBlobNoJobCleanup()
+			throws IOException, InterruptedException, ExecutionException {
+		testTransientBlobCleanup(null);
+	}
+
+	@Test
+	public void testTransientBlobForJobCleanup()
+			throws IOException, InterruptedException, ExecutionException {
+		testTransientBlobCleanup(new JobID());
+	}
+
+	/**
+	 * Tests that {@link TransientBlobCache} cleans up after a default TTL and keeps files which are
+	 * constantly accessed.
+	 */
+	private void testTransientBlobCleanup(@Nullable final JobID jobId)
+			throws IOException, InterruptedException, ExecutionException {
+
+		// 1s should be a safe-enough buffer to still check for existence after a BLOB's last access
+		long cleanupInterval = 1L; // in seconds
+		final int numberConcurrentGetOperations = 3;
+
+		final List<CompletableFuture<Void>> getOperations = new ArrayList<>(numberConcurrentGetOperations);
+
+		byte[] data = new byte[2000000];
+		rnd.nextBytes(data);
+		byte[] data2 = Arrays.copyOfRange(data, 10, 54);
+
+		Configuration config = new Configuration();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+			temporaryFolder.newFolder().getAbsolutePath());
+		config.setLong(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval);
+
+		long cleanupLowerBound;
+
+		try (
+			BlobServer server = new BlobServer(config, new VoidBlobStore());
+			final BlobCacheService cache = new BlobCacheService(
+				new InetSocketAddress("localhost", server.getPort()), config,
+				new VoidBlobStore())) {
+			ConcurrentMap<Tuple2<JobID, TransientBlobKey>, Long> transientBlobExpiryTimes =
+				cache.getTransientBlobService().getBlobExpiryTimes();
+
+			server.start();
+
+			final TransientBlobKey key1 =
+				(TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB);
+			final TransientBlobKey key2 =
+				(TransientBlobKey) put(server, jobId, data2, TRANSIENT_BLOB);
+
+			// access key1, verify expiry times
+			cleanupLowerBound = System.currentTimeMillis() + cleanupInterval;
+			verifyContents(cache, jobId, key1, data);
+			final Long key1ExpiryFirstAccess = transientBlobExpiryTimes.get(Tuple2.of(jobId, key1));
+			assertThat(key1ExpiryFirstAccess, greaterThanOrEqualTo(cleanupLowerBound));
+			assertNull(transientBlobExpiryTimes.get(Tuple2.of(jobId, key2)));
+
+			// access key2, verify expiry times (delay at least 1ms to also verify key1 expiry is unchanged)
+			Thread.sleep(1);
+			cleanupLowerBound = System.currentTimeMillis() + cleanupInterval;
+			verifyContents(cache, jobId, key2, data2);
+			assertEquals(key1ExpiryFirstAccess, transientBlobExpiryTimes.get(Tuple2.of(jobId, key1)));
+			assertThat(transientBlobExpiryTimes.get(Tuple2.of(jobId, key2)),
+				greaterThanOrEqualTo(cleanupLowerBound));
+
+			// files are cached now for the given TTL - remove from server so that they are not re-downloaded
+			if (jobId != null) {
+				server.cleanupJob(jobId);
+			} else {
+				server.deleteFromCache(key1);
+				server.deleteFromCache(key2);
+			}
+			checkFileCountForJob(0, jobId, server);
+
+			// cleanup task is run every cleanupInterval seconds
+			// => unaccessed file should remain at most 2*cleanupInterval seconds
+			// (use 3*cleanupInterval to check that we can still access it)
+			final long finishTime = System.currentTimeMillis() + 3 * cleanupInterval;
+
+			final ExecutorService executor = Executors.newFixedThreadPool(numberConcurrentGetOperations);
+			for (int i = 0; i < numberConcurrentGetOperations; i++) {
+				CompletableFuture<Void> getOperation = CompletableFuture
+					.supplyAsync(
+						() -> {
+							try {
+								// constantly access key1 so this should not get deleted
+								while (System.currentTimeMillis() < finishTime) {
+									get(cache, jobId, key1);
+								}
+
+								return null;
+							} catch (IOException e) {
+								throw new CompletionException(new FlinkException(
+									"Could not retrieve blob.", e));
+							}
+					}, executor);
+
+				getOperations.add(getOperation);
+			}
+
+			FutureUtils.ConjunctFuture<Collection<Void>> filesFuture = FutureUtils.combineAll(getOperations);
+			filesFuture.get();
+
+			verifyDeletedEventually(server, jobId, key1, key2);
+		}
+	}
+
 	/**
 	 * Checks that BLOBs for the given <tt>jobId</tt> are cleaned up eventually (after calling
 	 * {@link PermanentBlobCache#releaseJob(JobID)}, which is not done by this method!) (waits at
@@ -307,78 +437,4 @@ public class BlobCacheCleanupTest extends TestLogger {
 		// this fails if we exited via a timeout
 		checkFileCountForJob(0, jobId, cache);
 	}
-
-	/**
-	 * Checks how many of the files given by blob keys are accessible.
-	 *
-	 * @param jobId
-	 * 		ID of a job
-	 * @param keys
-	 * 		blob keys to check
-	 * @param blobService
-	 * 		BLOB store to use
-	 * @param doThrow
-	 * 		whether exceptions should be ignored (<tt>false</tt>), or thrown (<tt>true</tt>)
-	 *
-	 * @return number of files existing at {@link BlobServer#getStorageLocation(JobID, BlobKey)} and
-	 * {@link PermanentBlobCache#getStorageLocation(JobID, BlobKey)}, respectively
-	 */
-	public static int checkFilesExist(
-			JobID jobId, Collection<? extends BlobKey> keys, PermanentBlobService blobService, boolean doThrow)
-			throws IOException {
-
-		int numFiles = 0;
-
-		for (BlobKey key : keys) {
-			final File blobFile;
-			if (blobService instanceof BlobServer) {
-				BlobServer server = (BlobServer) blobService;
-				blobFile = server.getStorageLocation(jobId, key);
-			} else {
-				PermanentBlobCache cache = (PermanentBlobCache) blobService;
-				blobFile = cache.getStorageLocation(jobId, key);
-			}
-			if (blobFile.exists()) {
-				++numFiles;
-			} else if (doThrow) {
-				throw new IOException("File " + blobFile + " does not exist.");
-			}
-		}
-
-		return numFiles;
-	}
-
-	/**
-	 * Checks how many of the files given by blob keys are accessible.
-	 *
-	 * @param expectedCount
-	 * 		number of expected files in the blob service for the given job
-	 * @param jobId
-	 * 		ID of a job
-	 * @param blobService
-	 * 		BLOB store to use
-	 */
-	public static void checkFileCountForJob(
-		int expectedCount, JobID jobId, PermanentBlobService blobService)
-		throws IOException {
-
-		final File jobDir;
-		if (blobService instanceof BlobServer) {
-			BlobServer server = (BlobServer) blobService;
-			jobDir = server.getStorageLocation(jobId, new PermanentBlobKey()).getParentFile();
-		} else {
-			PermanentBlobCache cache = (PermanentBlobCache) blobService;
-			jobDir = cache.getStorageLocation(jobId, new PermanentBlobKey()).getParentFile();
-		}
-		File[] blobsForJob = jobDir.listFiles();
-		if (blobsForJob == null) {
-			if (expectedCount != 0) {
-				throw new IOException("File " + jobDir + " does not exist.");
-			}
-		} else {
-			assertEquals("Too many/few files in job dir: " +
-					Arrays.asList(blobsForJob).toString(), expectedCount,
-				blobsForJob.length);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1d48317/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
index 56258c3..264c7cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
@@ -55,11 +55,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Supplier;
 
-import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.apache.flink.runtime.blob.BlobKeyTest.verifyType;
+import static org.apache.flink.runtime.blob.BlobServerCleanupTest.checkFilesExist;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.BlockingInputStream;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.ChunkedInputStream;
@@ -909,19 +909,21 @@ public class BlobCachePutTest extends TestLogger {
 	 * 		BLOB server
 	 * @param jobId
 	 * 		job ID or <tt>null</tt> if job-unrelated
-	 * @param key
-	 * 		key identifying the BLOB to request
+	 * @param keys
+	 * 		key(s) identifying the BLOB to request
 	 */
-	static void verifyDeletedEventually(BlobServer server, @Nullable JobID jobId, BlobKey key)
+	static void verifyDeletedEventually(BlobServer server, @Nullable JobID jobId, BlobKey... keys)
 			throws IOException, InterruptedException {
 
 		long deadline = System.currentTimeMillis() + 30_000L;
 		do {
 			Thread.sleep(10);
 		}
-		while (checkFilesExist(jobId, Collections.singletonList(key), server, false) != 0 &&
+		while (checkFilesExist(jobId, Arrays.asList(keys), server, false) != 0 &&
 			System.currentTimeMillis() < deadline);
 
-		verifyDeleted(server, jobId, key);
+		for (BlobKey key : keys) {
+			verifyDeleted(server, jobId, key);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1d48317/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
new file mode 100644
index 0000000..aafba30
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.runtime.blob.BlobCachePutTest.verifyDeletedEventually;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
+import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
+import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * A few tests for the cleanup of transient BLOBs at the {@link BlobServer}.
+ */
+public class BlobServerCleanupTest extends TestLogger {
+
+	private final Random rnd = new Random();
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Test
+	public void testTransientBlobNoJobCleanup()
+			throws IOException, InterruptedException, ExecutionException {
+		testTransientBlobCleanup(null);
+	}
+
+	@Test
+	public void testTransientBlobForJobCleanup()
+			throws IOException, InterruptedException, ExecutionException {
+		testTransientBlobCleanup(new JobID());
+	}
+
+	/**
+	 * Tests that {@link TransientBlobCache} cleans up after a default TTL and keeps files which are
+	 * constantly accessed.
+	 */
+	private void testTransientBlobCleanup(@Nullable final JobID jobId)
+			throws IOException, InterruptedException, ExecutionException {
+
+		// 1s should be a safe-enough buffer to still check for existence after a BLOB's last access
+		long cleanupInterval = 1L; // in seconds
+		final int numberConcurrentGetOperations = 3;
+
+		final List<CompletableFuture<Void>> getOperations = new ArrayList<>(numberConcurrentGetOperations);
+
+		byte[] data = new byte[2000000];
+		rnd.nextBytes(data);
+		byte[] data2 = Arrays.copyOfRange(data, 10, 54);
+
+		Configuration config = new Configuration();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+			temporaryFolder.newFolder().getAbsolutePath());
+		config.setLong(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval);
+
+		long cleanupLowerBound;
+
+		try (BlobServer server = new BlobServer(config, new VoidBlobStore())) {
+
+			ConcurrentMap<Tuple2<JobID, TransientBlobKey>, Long> transientBlobExpiryTimes =
+				server.getBlobExpiryTimes();
+
+			server.start();
+
+			// after put(), files are cached for the given TTL
+			cleanupLowerBound = System.currentTimeMillis() + cleanupInterval;
+			final TransientBlobKey key1 =
+				(TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB);
+			final Long key1ExpiryAfterPut = transientBlobExpiryTimes.get(Tuple2.of(jobId, key1));
+			assertThat(key1ExpiryAfterPut, greaterThanOrEqualTo(cleanupLowerBound));
+
+			cleanupLowerBound = System.currentTimeMillis() + cleanupInterval;
+			final TransientBlobKey key2 =
+				(TransientBlobKey) put(server, jobId, data2, TRANSIENT_BLOB);
+			final Long key2ExpiryAfterPut = transientBlobExpiryTimes.get(Tuple2.of(jobId, key2));
+			assertThat(key2ExpiryAfterPut, greaterThanOrEqualTo(cleanupLowerBound));
+
+			// check that HA contents are not cleaned up
+			final JobID jobIdHA = (jobId == null) ? new JobID() : jobId;
+			final BlobKey keyHA = put(server, jobIdHA, data, PERMANENT_BLOB);
+
+			// access key1, verify expiry times (delay at least 1ms to also verify key2 expiry is unchanged)
+			Thread.sleep(1);
+			cleanupLowerBound = System.currentTimeMillis() + cleanupInterval;
+			verifyContents(server, jobId, key1, data);
+			final Long key1ExpiryAfterGet = transientBlobExpiryTimes.get(Tuple2.of(jobId, key1));
+			assertThat(key1ExpiryAfterGet, greaterThan(key1ExpiryAfterPut));
+			assertThat(key1ExpiryAfterGet, greaterThanOrEqualTo(cleanupLowerBound));
+			assertEquals(key2ExpiryAfterPut, transientBlobExpiryTimes.get(Tuple2.of(jobId, key2)));
+
+			// access key2, verify expiry times (delay at least 1ms to also verify key1 expiry is unchanged)
+			Thread.sleep(1);
+			cleanupLowerBound = System.currentTimeMillis() + cleanupInterval;
+			verifyContents(server, jobId, key2, data2);
+			assertEquals(key1ExpiryAfterGet, transientBlobExpiryTimes.get(Tuple2.of(jobId, key1)));
+			assertThat(transientBlobExpiryTimes.get(Tuple2.of(jobId, key2)),
+				greaterThan(key2ExpiryAfterPut));
+			assertThat(transientBlobExpiryTimes.get(Tuple2.of(jobId, key2)),
+				greaterThanOrEqualTo(cleanupLowerBound));
+
+			// cleanup task is run every cleanupInterval seconds
+			// => unaccessed file should remain at most 2*cleanupInterval seconds
+			// (use 3*cleanupInterval to check that we can still access it)
+			final long finishTime = System.currentTimeMillis() + 3 * cleanupInterval;
+
+			final ExecutorService executor = Executors.newFixedThreadPool(numberConcurrentGetOperations);
+			for (int i = 0; i < numberConcurrentGetOperations; i++) {
+				CompletableFuture<Void> getOperation = CompletableFuture
+					.supplyAsync(
+						() -> {
+							try {
+								// constantly access key1 so this should not get deleted
+								while (System.currentTimeMillis() < finishTime) {
+									get(server, jobId, key1);
+								}
+
+								return null;
+							} catch (IOException e) {
+								throw new CompletionException(new FlinkException(
+									"Could not retrieve blob.", e));
+							}
+						}, executor);
+
+				getOperations.add(getOperation);
+			}
+
+			FutureUtils.ConjunctFuture<Collection<Void>> filesFuture = FutureUtils.combineAll(getOperations);
+			filesFuture.get();
+
+			verifyDeletedEventually(server, jobId, key1, key2);
+
+			// HA content should be unaffected
+			verifyContents(server, jobIdHA, keyHA, data);
+		}
+	}
+
+	/**
+	 * Checks how many of the files given by blob keys are accessible.
+	 *
+	 * @param jobId
+	 * 		ID of a job
+	 * @param keys
+	 * 		blob keys to check
+	 * @param blobService
+	 * 		BLOB store to use
+	 * @param doThrow
+	 * 		whether exceptions should be ignored (<tt>false</tt>), or thrown (<tt>true</tt>)
+	 *
+	 * @return number of files existing at {@link BlobServer#getStorageLocation(JobID, BlobKey)} and
+	 * {@link PermanentBlobCache#getStorageLocation(JobID, BlobKey)}, respectively
+	 */
+	public static <T> int checkFilesExist(
+		JobID jobId, Collection<? extends BlobKey> keys, T blobService, boolean doThrow)
+		throws IOException {
+
+		int numFiles = 0;
+
+		for (BlobKey key : keys) {
+			final File blobFile;
+			if (blobService instanceof BlobServer) {
+				BlobServer server = (BlobServer) blobService;
+				blobFile = server.getStorageLocation(jobId, key);
+			} else if (blobService instanceof PermanentBlobCache) {
+				PermanentBlobCache cache = (PermanentBlobCache) blobService;
+				blobFile = cache.getStorageLocation(jobId, key);
+			} else if (blobService instanceof TransientBlobCache) {
+				TransientBlobCache cache = (TransientBlobCache) blobService;
+				blobFile = cache.getStorageLocation(jobId, key);
+			} else {
+				throw new UnsupportedOperationException(
+					"unsupported BLOB service class: " + blobService.getClass().getCanonicalName());
+			}
+			if (blobFile.exists()) {
+				++numFiles;
+			} else if (doThrow) {
+				throw new IOException("File " + blobFile + " does not exist.");
+			}
+		}
+
+		return numFiles;
+	}
+
+	/**
+	 * Checks how many of the files given by blob keys are accessible.
+	 *
+	 * @param expectedCount
+	 * 		number of expected files in the blob service for the given job
+	 * @param jobId
+	 * 		ID of a job
+	 * @param blobService
+	 * 		BLOB store to use
+	 */
+	public static void checkFileCountForJob(
+		int expectedCount, JobID jobId, PermanentBlobService blobService)
+		throws IOException {
+
+		final File jobDir;
+		if (blobService instanceof BlobServer) {
+			BlobServer server = (BlobServer) blobService;
+			jobDir = server.getStorageLocation(jobId, new PermanentBlobKey()).getParentFile();
+		} else {
+			PermanentBlobCache cache = (PermanentBlobCache) blobService;
+			jobDir = cache.getStorageLocation(jobId, new PermanentBlobKey()).getParentFile();
+		}
+		File[] blobsForJob = jobDir.listFiles();
+		if (blobsForJob == null) {
+			if (expectedCount != 0) {
+				throw new IOException("File " + jobDir + " does not exist.");
+			}
+		} else {
+			assertEquals("Too many/few files in job dir: " +
+					Arrays.asList(blobsForJob).toString(), expectedCount,
+				blobsForJob.length);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1d48317/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index fde21ba..2168034 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -44,10 +44,10 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
+import static org.apache.flink.runtime.blob.BlobServerCleanupTest.checkFileCountForJob;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;

http://git-wip-us.apache.org/repos/asf/flink/blob/a1d48317/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index cb5d608..080f743 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -42,8 +42,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
-import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob;
-import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist;
+import static org.apache.flink.runtime.blob.BlobServerCleanupTest.checkFileCountForJob;
+import static org.apache.flink.runtime.blob.BlobServerCleanupTest.checkFilesExist;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;