You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/16 07:28:40 UTC

flink git commit: [FLINK-7140][blob] add an additional random component into the BlobKey

Repository: flink
Updated Branches:
  refs/heads/master b2b94632d -> f853f3359


[FLINK-7140][blob] add an additional random component into the BlobKey

This should guard us from uploading (and deleting) the same file more than
once and also from hash collisions.

This closes #4359.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f853f335
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f853f335
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f853f335

Branch: refs/heads/master
Commit: f853f33593373f75a72351f5256564533bc063f9
Parents: b2b9463
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Oct 5 09:33:27 2017 +0200
Committer: Till <ti...@gmail.com>
Committed: Mon Oct 16 09:23:37 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/blob/AbstractBlobCache.java   |  18 +++-
 .../apache/flink/runtime/blob/BlobClient.java   |  12 ++-
 .../org/apache/flink/runtime/blob/BlobKey.java  | 100 +++++++++++++++++--
 .../apache/flink/runtime/blob/BlobServer.java   |  66 +++++++++---
 .../runtime/blob/BlobServerConnection.java      |  18 ++--
 .../apache/flink/runtime/blob/BlobUtils.java    |  12 +--
 .../flink/runtime/blob/PermanentBlobKey.java    |  12 +++
 .../flink/runtime/blob/TransientBlobKey.java    |  12 +++
 .../handler/legacy/TaskManagerLogHandler.java   |   3 +
 .../flink/runtime/blob/BlobCacheDeleteTest.java |  24 +++--
 .../flink/runtime/blob/BlobCacheGetTest.java    |  33 +++---
 .../flink/runtime/blob/BlobCachePutTest.java    |  37 +++++--
 .../runtime/blob/BlobCacheRecoveryTest.java     |  12 +--
 .../flink/runtime/blob/BlobClientTest.java      |  53 +++++-----
 .../apache/flink/runtime/blob/BlobKeyTest.java  |  84 +++++++++++++---
 .../runtime/blob/BlobServerDeleteTest.java      |  21 ++--
 .../flink/runtime/blob/BlobServerGetTest.java   |  19 ++--
 .../flink/runtime/blob/BlobServerPutTest.java   |  33 ++++--
 .../runtime/blob/BlobServerRecoveryTest.java    |   7 +-
 19 files changed, 405 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
index dc031e0..729ac9a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
@@ -159,8 +159,13 @@ public abstract class AbstractBlobCache implements Closeable {
 			try {
 				if (blobView.get(jobId, blobKey, incomingFile)) {
 					// now move the temp file to our local cache atomically
-					BlobUtils.moveTempFileToStore(
-						incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), log, null);
+					readWriteLock.writeLock().lock();
+					try {
+						BlobUtils.moveTempFileToStore(
+							incomingFile, jobId, blobKey, localFile, log, null);
+					} finally {
+						readWriteLock.writeLock().unlock();
+					}
 
 					return localFile;
 				}
@@ -172,8 +177,13 @@ public abstract class AbstractBlobCache implements Closeable {
 			BlobClient.downloadFromBlobServer(
 				jobId, blobKey, incomingFile, serverAddress, blobClientConfig, numFetchRetries);
 
-			BlobUtils.moveTempFileToStore(
-				incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), log, null);
+			readWriteLock.writeLock().lock();
+			try {
+				BlobUtils.moveTempFileToStore(
+					incomingFile, jobId, blobKey, localFile, log, null);
+			} finally {
+				readWriteLock.writeLock().unlock();
+			}
 
 			return localFile;
 		} finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 3154f69..fbcce58 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -47,9 +47,11 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.security.MessageDigest;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT;
@@ -57,7 +59,6 @@ import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_UNRELATED_CON
 import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
-import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobUtils.readFully;
 import static org.apache.flink.runtime.blob.BlobUtils.readLength;
 import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
@@ -496,13 +497,16 @@ public final class BlobClient implements Closeable {
 		else if (response == RETURN_OKAY) {
 
 			BlobKey remoteKey = BlobKey.readFromInputStream(is);
-			BlobKey localKey = BlobKey.createKey(blobType, md.digest());
+			byte[] localHash = md.digest();
 
-			if (!localKey.equals(remoteKey)) {
+			if (blobType != remoteKey.getType()) {
+				throw new IOException("Detected data corruption during transfer");
+			}
+			if (!Arrays.equals(localHash, remoteKey.getHash())) {
 				throw new IOException("Detected data corruption during transfer");
 			}
 
-			return localKey;
+			return remoteKey;
 		}
 		else if (response == RETURN_ERROR) {
 			Throwable cause = readExceptionFromStream(is);

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
index 0aa45e1..ef2d64d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.StringUtils;
 
 import java.io.EOFException;
@@ -41,7 +42,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 	private static final long serialVersionUID = 3847117712521785209L;
 
 	/** Size of the internal BLOB key in bytes. */
-	private static final int SIZE = 20;
+	public static final int SIZE = 20;
 
 	/** The byte buffer storing the actual key data. */
 	private final byte[] key;
@@ -68,6 +69,11 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 	}
 
 	/**
+	 * Random component of the key.
+	 */
+	private final AbstractID random;
+
+	/**
 	 * Constructs a new BLOB key.
 	 *
 	 * @param type
@@ -76,6 +82,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 	protected BlobKey(BlobType type) {
 		this.type = checkNotNull(type);
 		this.key = new byte[SIZE];
+		this.random = new AbstractID();
 	}
 
 	/**
@@ -87,13 +94,33 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 	 *        the actual key data
 	 */
 	protected BlobKey(BlobType type, byte[] key) {
+		if (key == null || key.length != SIZE) {
+			throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
+		}
+
 		this.type = checkNotNull(type);
+		this.key = key;
+		this.random = new AbstractID();
+	}
 
+	/**
+	 * Constructs a new BLOB key from the given byte array.
+	 *
+	 * @param type
+	 * 		whether the referenced BLOB is permanent or transient
+	 * @param key
+	 *        the actual key data
+	 * @param random
+	 *        the random component of the key
+	 */
+	protected BlobKey(BlobType type, byte[] key, byte[] random) {
 		if (key == null || key.length != SIZE) {
 			throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
 		}
 
+		this.type = checkNotNull(type);
 		this.key = key;
+		this.random = new AbstractID(random);
 	}
 
 	/**
@@ -107,10 +134,10 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 	@VisibleForTesting
 	static BlobKey createKey(BlobType type) {
 		if (type == PERMANENT_BLOB) {
-            return new PermanentBlobKey();
-        } else {
+			return new PermanentBlobKey();
+		} else {
 			return new TransientBlobKey();
-        }
+		}
 	}
 
 	/**
@@ -125,10 +152,30 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 	 */
 	static BlobKey createKey(BlobType type, byte[] key) {
 		if (type == PERMANENT_BLOB) {
-            return new PermanentBlobKey(key);
-        } else {
+			return new PermanentBlobKey(key);
+		} else {
 			return new TransientBlobKey(key);
-        }
+		}
+	}
+
+	/**
+	 * Returns the right {@link BlobKey} subclass for the given parameters.
+	 *
+	 * @param type
+	 * 		whether the referenced BLOB is permanent or transient
+	 * @param key
+	 *        the actual key data
+	 * @param random
+	 *        the random component of the key
+	 *
+	 * @return BlobKey subclass
+	 */
+	static BlobKey createKey(BlobType type, byte[] key, byte[] random) {
+		if (type == PERMANENT_BLOB) {
+			return new PermanentBlobKey(key, random);
+		} else {
+			return new TransientBlobKey(key, random);
+		}
 	}
 
 	/**
@@ -141,6 +188,15 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 	}
 
 	/**
+	 * Returns the (internal) BLOB type which is reflected by the inheriting sub-class.
+	 *
+	 * @return BLOB type, i.e. permanent or transient
+	 */
+	BlobType getType() {
+		return type;
+	}
+
+	/**
 	 * Adds the BLOB key to the given {@link MessageDigest}.
 	 * 
 	 * @param md
@@ -159,13 +215,16 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 
 		final BlobKey bk = (BlobKey) obj;
 
-		return Arrays.equals(this.key, bk.key) && this.type == bk.type;
+		return Arrays.equals(this.key, bk.key) &&
+			this.type == bk.type &&
+			this.random.equals(bk.random);
 	}
 
 	@Override
 	public int hashCode() {
 		int result = Arrays.hashCode(this.key);
 		result = 37 * result + this.type.hashCode();
+		result = 37 * result + this.random.hashCode();
 		return result;
 	}
 
@@ -183,7 +242,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 				// this actually never happens!
 				throw new IllegalStateException("Invalid BLOB type");
 		}
-		return typeString + StringUtils.byteToHexString(this.key);
+		return typeString + StringUtils.byteToHexString(this.key) + "-" + random.toString();
 	}
 
 	@Override
@@ -203,7 +262,13 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 
 		if (aarr.length == barr.length) {
 			// same hash contents - compare the BLOB types
-			return this.type.compareTo(o.type);
+			int typeCompare = this.type.compareTo(o.type);
+			if (typeCompare == 0) {
+				// same type - compare random components
+				return this.random.compareTo(o.random);
+			} else {
+				return typeCompare;
+			}
 		} else {
 			return aarr.length - barr.length;
 		}
@@ -223,6 +288,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 	static BlobKey readFromInputStream(InputStream inputStream) throws IOException {
 
 		final byte[] key = new byte[BlobKey.SIZE];
+		final byte[] random = new byte[AbstractID.SIZE];
 
 		int bytesRead = 0;
 		// read key
@@ -233,6 +299,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 			}
 			bytesRead += read;
 		}
+
 		// read BLOB type
 		final BlobType blobType;
 		{
@@ -248,7 +315,17 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 			}
 		}
 
-		return createKey(blobType, key);
+		// read random component
+		bytesRead = 0;
+		while (bytesRead < AbstractID.SIZE) {
+			final int read = inputStream.read(random, bytesRead, AbstractID.SIZE - bytesRead);
+			if (read < 0) {
+				throw new EOFException("Read an incomplete BLOB key");
+			}
+			bytesRead += read;
+		}
+
+		return createKey(blobType, key, random);
 	}
 
 	/**
@@ -262,5 +339,6 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 	void writeToOutputStream(final OutputStream outputStream) throws IOException {
 		outputStream.write(this.key);
 		outputStream.write(this.type.ordinal());
+		outputStream.write(this.random.getBytes());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 7804dfd..bc61ef7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -474,8 +474,13 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 				incomingFile = createTemporaryFilename();
 				blobStore.get(jobId, blobKey, incomingFile);
 
-				BlobUtils.moveTempFileToStore(
-					incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null);
+				readWriteLock.writeLock().lock();
+				try {
+					BlobUtils.moveTempFileToStore(
+						incomingFile, jobId, blobKey, localFile, LOG, null);
+				} finally {
+					readWriteLock.writeLock().unlock();
+				}
 
 				return;
 			} finally {
@@ -586,10 +591,8 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 			md.update(value);
 			fos.write(value);
 
-			blobKey = BlobKey.createKey(blobType, md.digest());
-
 			// persist file
-			moveTempFileToStore(incomingFile, jobId, blobKey);
+			blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);
 
 			return blobKey;
 		} finally {
@@ -642,10 +645,8 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 				md.update(buf, 0, bytesRead);
 			}
 
-			blobKey = BlobKey.createKey(blobType, md.digest());
-
 			// persist file
-			moveTempFileToStore(incomingFile, jobId, blobKey);
+			blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);
 
 			return blobKey;
 		} finally {
@@ -665,20 +666,53 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 * 		temporary file created during transfer
 	 * @param jobId
 	 * 		ID of the job this blob belongs to or <tt>null</tt> if job-unrelated
-	 * @param blobKey
-	 * 		BLOB key identifying the file
+	 * @param digest
+	 * 		BLOB content digest, i.e. hash
+	 * @param blobType
+	 * 		whether this file is a permanent or transient BLOB
+	 *
+	 * @return unique BLOB key that identifies the BLOB on the server
 	 *
 	 * @throws IOException
 	 * 		thrown if an I/O error occurs while moving the file or uploading it to the HA store
 	 */
-	void moveTempFileToStore(
-			File incomingFile, @Nullable JobID jobId, BlobKey blobKey) throws IOException {
+	BlobKey moveTempFileToStore(
+			File incomingFile, @Nullable JobID jobId, byte[] digest, BlobKey.BlobType blobType)
+			throws IOException {
+
+		int retries = 10;
 
-		File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
+		int attempt = 0;
+		while (true) {
+			// add unique component independent of the BLOB content
+			BlobKey blobKey = BlobKey.createKey(blobType, digest);
+			File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
 
-		BlobUtils.moveTempFileToStore(
-			incomingFile, jobId, blobKey, storageFile, readWriteLock.writeLock(), LOG,
-			blobKey instanceof PermanentBlobKey ? blobStore : null);
+			// try again until the key is unique (put the existence check into the lock!)
+			readWriteLock.writeLock().lock();
+			try {
+				if (!storageFile.exists()) {
+					BlobUtils.moveTempFileToStore(
+						incomingFile, jobId, blobKey, storageFile, LOG,
+						blobKey instanceof PermanentBlobKey ? blobStore : null);
+					return blobKey;
+				}
+			} finally {
+				readWriteLock.writeLock().unlock();
+			}
+
+			++attempt;
+			if (attempt >= retries) {
+				String message = "Failed to find a unique key for BLOB of job " + jobId + " (last tried " + storageFile.getAbsolutePath() + ".";
+				LOG.error(message + " No retries left.");
+				throw new IOException(message);
+			} else {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Trying to find a unique key for BLOB of job {} (retry {}, last tried {})",
+						jobId, attempt, storageFile.getAbsolutePath());
+				}
+			}
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index be62581..fa8427e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -37,6 +37,8 @@ import java.security.MessageDigest;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT;
@@ -44,8 +46,6 @@ import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_UNRELATED_CON
 import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
-import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
-import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobUtils.closeSilently;
 import static org.apache.flink.runtime.blob.BlobUtils.readFully;
 import static org.apache.flink.runtime.blob.BlobUtils.readLength;
@@ -346,9 +346,9 @@ class BlobServerConnection extends Thread {
 			}
 
 			incomingFile = blobServer.createTemporaryFilename();
-			BlobKey blobKey = readFileFully(inputStream, incomingFile, buf, blobType);
+			byte[] digest = readFileFully(inputStream, incomingFile, buf);
 
-			blobServer.moveTempFileToStore(incomingFile, jobId, blobKey);
+			BlobKey blobKey = blobServer.moveTempFileToStore(incomingFile, jobId, digest, blobType);
 
 			// Return computed key to client for validation
 			outputStream.write(RETURN_OKAY);
@@ -387,16 +387,14 @@ class BlobServerConnection extends Thread {
 	 * 		file to write to
 	 * @param buf
 	 * 		An auxiliary buffer for data serialization/deserialization
-	 * @param blobType
-	 * 		whether to make the data permanent or transient
 	 *
-	 * @return the received file's content hash as a BLOB key
+	 * @return the received file's content hash
 	 *
 	 * @throws IOException
 	 * 		thrown if an I/O error occurs while reading/writing data from/to the respective streams
 	 */
-	private static BlobKey readFileFully(
-			final InputStream inputStream, final File incomingFile, final byte[] buf, BlobKey.BlobType blobType)
+	private static byte[] readFileFully(
+			final InputStream inputStream, final File incomingFile, final byte[] buf)
 			throws IOException {
 		MessageDigest md = BlobUtils.createMessageDigest();
 
@@ -417,7 +415,7 @@ class BlobServerConnection extends Thread {
 
 				md.update(buf, 0, bytesExpected);
 			}
-			return BlobKey.createKey(blobType, md.digest());
+			return md.digest();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index d8223c8..04f2cdb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -30,6 +30,7 @@ import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
+
 import java.io.Closeable;
 import java.io.EOFException;
 import java.io.File;
@@ -42,7 +43,6 @@ import java.nio.file.Files;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.UUID;
-import java.util.concurrent.locks.Lock;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
@@ -422,7 +422,7 @@ public class BlobUtils {
 
 	/**
 	 * Moves the temporary <tt>incomingFile</tt> to its permanent location where it is available for
-	 * use.
+	 * use (not thread-safe!).
 	 *
 	 * @param incomingFile
 	 * 		temporary file created during transfer
@@ -432,8 +432,6 @@ public class BlobUtils {
 	 * 		BLOB key identifying the file
 	 * @param storageFile
 	 *      (local) file where the blob is/should be stored
-	 * @param writeLock
-	 *      lock to acquire before doing the move
 	 * @param log
 	 *      logger for debug information
 	 * @param blobStore
@@ -444,9 +442,7 @@ public class BlobUtils {
 	 */
 	static void moveTempFileToStore(
 			File incomingFile, @Nullable JobID jobId, BlobKey blobKey, File storageFile,
-			Lock writeLock, Logger log, @Nullable BlobStore blobStore) throws IOException {
-
-		writeLock.lock();
+			Logger log, @Nullable BlobStore blobStore) throws IOException {
 
 		try {
 			// first check whether the file already exists
@@ -483,8 +479,6 @@ public class BlobUtils {
 			if (incomingFile != null && !incomingFile.delete() && incomingFile.exists()) {
 				log.warn("Could not delete the staging file {} for blob key {} and job {}.", incomingFile, blobKey, jobId);
 			}
-
-			writeLock.unlock();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java
index 2ad8f72..40732fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java
@@ -42,4 +42,16 @@ public final class PermanentBlobKey extends BlobKey {
 	PermanentBlobKey(byte[] key) {
 		super(BlobType.PERMANENT_BLOB, key);
 	}
+
+	/**
+	 * Constructs a new BLOB key from the given byte array.
+	 *
+	 * @param key
+	 *        the actual key data
+	 * @param random
+	 *        the random component of the key
+	 */
+	PermanentBlobKey(byte[] key, byte[] random) {
+		super(BlobType.PERMANENT_BLOB, key, random);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java
index 43e0f5f..15a9637 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java
@@ -42,4 +42,16 @@ public final class TransientBlobKey extends BlobKey {
 	TransientBlobKey(byte[] key) {
 		super(BlobType.TRANSIENT_BLOB, key);
 	}
+
+	/**
+	 * Constructs a new BLOB key from the given byte array.
+	 *
+	 * @param key
+	 *        the actual key data
+	 * @param random
+	 *        the random component of the key
+	 */
+	TransientBlobKey(byte[] key, byte[] random) {
+		super(BlobType.TRANSIENT_BLOB, key, random);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
index 54725e1..cf5bfcb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
@@ -190,6 +190,9 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
 							//delete previous log file, if it is different than the current one
 							HashMap<String, TransientBlobKey> lastSubmittedFile = fileMode == FileMode.LOG ? lastSubmittedLog : lastSubmittedStdout;
 							if (lastSubmittedFile.containsKey(taskManagerID)) {
+								// the BlobKey will almost certainly be different but the old file
+								// may not exist anymore so we cannot rely on it and need to
+								// download the new file anyway, even if the hashes match
 								if (!Objects.equals(blobKey, lastSubmittedFile.get(taskManagerID))) {
 									if (!blobCache.deleteFromCache(lastSubmittedFile.get(taskManagerID))) {
 										throw new CompletionException(new FlinkException("Could not delete file for " + taskManagerID + '.'));

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
index a83d100..16ba020 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
@@ -51,7 +51,6 @@ import static org.apache.flink.runtime.blob.BlobServerDeleteTest.delete;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -87,6 +86,12 @@ public class BlobCacheDeleteTest extends TestLogger {
 		testDelete(new JobID(), new JobID());
 	}
 
+	@Test
+	public void testDeleteTransient5() throws IOException {
+		JobID jobId = new JobID();
+		testDelete(jobId, jobId);
+	}
+
 	/**
 	 * Uploads a (different) byte array for each of the given jobs and verifies that deleting one of
 	 * them (via the {@link BlobCacheService}) does not influence the other.
@@ -98,7 +103,6 @@ public class BlobCacheDeleteTest extends TestLogger {
 	 */
 	private void testDelete(@Nullable JobID jobId1, @Nullable JobID jobId2)
 			throws IOException {
-		final boolean sameJobId = (jobId1 == jobId2) || (jobId1 != null && jobId1.equals(jobId2));
 
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -122,9 +126,10 @@ public class BlobCacheDeleteTest extends TestLogger {
 			// put two more BLOBs (same key, other key) for another job ID
 			TransientBlobKey key2a = (TransientBlobKey) put(server, jobId2, data, TRANSIENT_BLOB);
 			assertNotNull(key2a);
-			assertEquals(key1, key2a);
+			BlobKeyTest.verifyKeyDifferentHashEquals(key1, key2a);
 			TransientBlobKey key2b = (TransientBlobKey) put(server, jobId2, data2, TRANSIENT_BLOB);
 			assertNotNull(key2b);
+			BlobKeyTest.verifyKeyDifferentHashDifferent(key1, key2b);
 
 			// issue a DELETE request
 			assertTrue(delete(cache, jobId1, key1));
@@ -134,16 +139,15 @@ public class BlobCacheDeleteTest extends TestLogger {
 			// delete on server so that the cache cannot re-download
 			assertTrue(server.deleteInternal(jobId1, key1));
 			verifyDeleted(cache, jobId1, key1);
-			// deleting a one BLOB should not affect another BLOB, even with the same key if job IDs are different
-			if (!sameJobId) {
-				verifyContents(server, jobId2, key2a, data);
-			}
+			// deleting one BLOB should not affect another BLOB with a different key
+			// (and keys are always different now)
+			verifyContents(server, jobId2, key2a, data);
 			verifyContents(server, jobId2, key2b, data2);
 
 			// delete first file of second job
 			assertTrue(delete(cache, jobId2, key2a));
-			// delete only works on local cache (unless already deleted - key1 == key2a)!
-			assertTrue(sameJobId || server.getStorageLocation(jobId2, key2a).exists());
+			// delete only works on local cache
+			assertTrue(server.getStorageLocation(jobId2, key2a).exists());
 			// delete on server so that the cache cannot re-download
 			assertTrue(server.deleteInternal(jobId2, key2a));
 			verifyDeleted(cache, jobId2, key2a);
@@ -151,7 +155,7 @@ public class BlobCacheDeleteTest extends TestLogger {
 
 			// delete second file of second job
 			assertTrue(delete(cache, jobId2, key2b));
-			// delete only works on local cache (unless already deleted - key1 == key2a)!
+			// delete only works on local cache
 			assertTrue(server.getStorageLocation(jobId2, key2b).exists());
 			// delete on server so that the cache cannot re-download
 			assertTrue(server.deleteInternal(jobId2, key2b));

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
index bed27d8..c760d04 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
@@ -40,7 +40,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.file.AccessDeniedException;
-import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -57,6 +56,7 @@ import static org.apache.flink.runtime.blob.BlobCachePutTest.verifyDeletedEventu
 import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.apache.flink.runtime.blob.BlobKeyTest.verifyType;
 import static org.apache.flink.runtime.blob.BlobServerDeleteTest.delete;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
@@ -152,37 +152,37 @@ public class BlobCacheGetTest extends TestLogger {
 
 			// add the same data under a second jobId
 			BlobKey key2 = put(server, jobId2, data, blobType);
-			assertNotNull(key);
-			assertEquals(key, key2);
+			assertNotNull(key2);
+			verifyKeyDifferentHashEquals(key, key2);
 
 			// request for jobId2 should succeed
-			get(cache, jobId2, key);
+			get(cache, jobId2, key2);
 			// request for jobId1 should still fail
 			verifyDeleted(cache, jobId1, key);
 
 			if (blobType == PERMANENT_BLOB) {
 				// still existing on server
-				assertTrue(server.getStorageLocation(jobId2, key).exists());
+				assertTrue(server.getStorageLocation(jobId2, key2).exists());
 				// delete jobId2 on cache
-				blobFile = cache.getPermanentBlobService().getStorageLocation(jobId2, key);
+				blobFile = cache.getPermanentBlobService().getStorageLocation(jobId2, key2);
 				assertTrue(blobFile.delete());
 				// try to retrieve again
-				get(cache, jobId2, key);
+				get(cache, jobId2, key2);
 
 				// delete on cache and server, verify that it is not accessible anymore
-				blobFile = cache.getPermanentBlobService().getStorageLocation(jobId2, key);
+				blobFile = cache.getPermanentBlobService().getStorageLocation(jobId2, key2);
 				assertTrue(blobFile.delete());
-				blobFile = server.getStorageLocation(jobId2, key);
+				blobFile = server.getStorageLocation(jobId2, key2);
 				assertTrue(blobFile.delete());
-				verifyDeleted(cache, jobId2, key);
+				verifyDeleted(cache, jobId2, key2);
 			} else {
 				// deleted eventually on the server by the GET request above
-				verifyDeletedEventually(server, jobId2, key);
+				verifyDeletedEventually(server, jobId2, key2);
 				// delete jobId2 on cache
-				blobFile = cache.getTransientBlobService().getStorageLocation(jobId2, key);
+				blobFile = cache.getTransientBlobService().getStorageLocation(jobId2, key2);
 				assertTrue(blobFile.delete());
 				// verify that it is not accessible anymore
-				verifyDeleted(cache, jobId2, key);
+				verifyDeleted(cache, jobId2, key2);
 			}
 		}
 	}
@@ -548,11 +548,6 @@ public class BlobCacheGetTest extends TestLogger {
 
 		final byte[] data = {1, 2, 3, 4, 99, 42};
 
-		MessageDigest md = BlobUtils.createMessageDigest();
-
-		// create the correct blob key by hashing our input data
-		final BlobKey blobKey = BlobKey.createKey(blobType, md.digest(data));
-
 		final ExecutorService executor = Executors.newFixedThreadPool(numberConcurrentGetOperations);
 
 		try (
@@ -563,7 +558,7 @@ public class BlobCacheGetTest extends TestLogger {
 			server.start();
 
 			// upload data first
-			assertEquals(blobKey, put(server, jobId, data, blobType));
+			final BlobKey blobKey = put(server, jobId, data, blobType);
 
 			// now try accessing it concurrently (only HA mode will be able to retrieve it from HA store!)
 			for (int i = 0; i < numberConcurrentGetOperations; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
index aa23c80..56258c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
@@ -58,6 +58,7 @@ import java.util.function.Supplier;
 import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.apache.flink.runtime.blob.BlobKeyTest.verifyType;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.BlockingInputStream;
@@ -275,6 +276,11 @@ public class BlobCachePutTest extends TestLogger {
 			BlobKey key1a = put(cache, jobId1, data, blobType);
 			assertNotNull(key1a);
 			verifyType(blobType, key1a);
+			// second upload of same data should yield a different BlobKey
+			BlobKey key1a2 = put(cache, jobId1, data, blobType);
+			assertNotNull(key1a2);
+			verifyType(blobType, key1a2);
+			verifyKeyDifferentHashEquals(key1a, key1a2);
 
 			BlobKey key1b = put(cache, jobId1, data2, blobType);
 			assertNotNull(key1b);
@@ -282,19 +288,23 @@ public class BlobCachePutTest extends TestLogger {
 
 			// files should be available on the server
 			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1a2, data);
 			verifyContents(server, jobId1, key1b, data2);
 
 			// now put data for jobId2 and verify that both are ok
 			BlobKey key2a = put(cache, jobId2, data, blobType);
 			assertNotNull(key2a);
-			assertEquals(key1a, key2a);
+			verifyType(blobType, key2a);
+			verifyKeyDifferentHashEquals(key1a, key2a);
 
 			BlobKey key2b = put(cache, jobId2, data2, blobType);
 			assertNotNull(key2b);
-			assertEquals(key1b, key2b);
+			verifyType(blobType, key2b);
+			verifyKeyDifferentHashEquals(key1b, key2b);
 
 			// verify the accessibility and the BLOB contents
 			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1a2, data);
 			verifyContents(server, jobId1, key1b, data2);
 			verifyContents(server, jobId2, key2a, data);
 			verifyContents(server, jobId2, key2b, data2);
@@ -382,26 +392,32 @@ public class BlobCachePutTest extends TestLogger {
 			TransientBlobKey key1a =
 				(TransientBlobKey) put(cache, jobId1, new ByteArrayInputStream(data), TRANSIENT_BLOB);
 			assertNotNull(key1a);
+			// second upload of same data should yield a different BlobKey
+			BlobKey key1a2 = put(cache, jobId1, new ByteArrayInputStream(data), TRANSIENT_BLOB);
+			assertNotNull(key1a2);
+			verifyKeyDifferentHashEquals(key1a, key1a2);
 
 			TransientBlobKey key1b = (TransientBlobKey) put(cache, jobId1, new ByteArrayInputStream(data2), TRANSIENT_BLOB);
 			assertNotNull(key1b);
 
 			// files should be available on the server
 			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1a2, data);
 			verifyContents(server, jobId1, key1b, data2);
 
 			// now put data for jobId2 and verify that both are ok
 			TransientBlobKey key2a =
 				(TransientBlobKey) put(cache, jobId2, new ByteArrayInputStream(data), TRANSIENT_BLOB);
 			assertNotNull(key2a);
-			assertEquals(key1a, key2a);
+			verifyKeyDifferentHashEquals(key1a, key2a);
 
 			TransientBlobKey key2b = (TransientBlobKey) put(cache, jobId2, new ByteArrayInputStream(data2), TRANSIENT_BLOB);
 			assertNotNull(key2b);
-			assertEquals(key1b, key2b);
+			verifyKeyDifferentHashEquals(key1b, key2b);
 
 			// verify the accessibility and the BLOB contents
 			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1a2, data);
 			verifyContents(server, jobId1, key1b, data2);
 			verifyContents(server, jobId2, key2a, data);
 			verifyContents(server, jobId2, key2b, data2);
@@ -486,6 +502,10 @@ public class BlobCachePutTest extends TestLogger {
 			TransientBlobKey key1a =
 				(TransientBlobKey) put(cache, jobId1, new ChunkedInputStream(data, 19), TRANSIENT_BLOB);
 			assertNotNull(key1a);
+			// second upload of same data should yield a different BlobKey
+			BlobKey key1a2 = put(cache, jobId1, new ChunkedInputStream(data, 19), TRANSIENT_BLOB);
+			assertNotNull(key1a2);
+			verifyKeyDifferentHashEquals(key1a, key1a2);
 
 			TransientBlobKey key1b =
 				(TransientBlobKey) put(cache, jobId1, new ChunkedInputStream(data2, 19), TRANSIENT_BLOB);
@@ -493,21 +513,23 @@ public class BlobCachePutTest extends TestLogger {
 
 			// files should be available on the server
 			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1a2, data);
 			verifyContents(server, jobId1, key1b, data2);
 
 			// now put data for jobId2 and verify that both are ok
 			TransientBlobKey key2a =
 				(TransientBlobKey) put(cache, jobId2, new ChunkedInputStream(data, 19), TRANSIENT_BLOB);
 			assertNotNull(key2a);
-			assertEquals(key1a, key2a);
+			verifyKeyDifferentHashEquals(key1a, key2a);
 
 			TransientBlobKey key2b =
 				(TransientBlobKey) put(cache, jobId2, new ChunkedInputStream(data2, 19), TRANSIENT_BLOB);
 			assertNotNull(key2b);
-			assertEquals(key1b, key2b);
+			verifyKeyDifferentHashEquals(key1b, key2b);
 
 			// verify the accessibility and the BLOB contents
 			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1a2, data);
 			verifyContents(server, jobId1, key1b, data2);
 			verifyContents(server, jobId2, key2a, data);
 			verifyContents(server, jobId2, key2b, data2);
@@ -858,7 +880,8 @@ public class BlobCachePutTest extends TestLogger {
 
 			// make sure that all blob keys are the same
 			while (blobKeyIterator.hasNext()) {
-				assertEquals(blobKey, blobKeyIterator.next());
+				// check for unique BlobKey, but should have same hash
+				verifyKeyDifferentHashEquals(blobKey, blobKeyIterator.next());
 			}
 
 			// check the uploaded file's contents

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
index 1a3f161..e275949 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
@@ -38,13 +38,11 @@ import java.util.Random;
 
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashDifferent;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -131,10 +129,8 @@ public class BlobCacheRecoveryTest extends TestLogger {
 
 			// put non-HA data
 			nonHAKey = put(cache0, jobId[0], expected2, TRANSIENT_BLOB);
-			assertNotEquals(keys[0], nonHAKey);
-			assertThat(keys[0].getHash(), not(equalTo(nonHAKey.getHash())));
-			assertNotEquals(keys[1], nonHAKey);
-			assertThat(keys[1].getHash(), equalTo(nonHAKey.getHash()));
+			verifyKeyDifferentHashDifferent(keys[0], nonHAKey);
+			verifyKeyDifferentHashEquals(keys[1], nonHAKey);
 
 			// check that the storage directory exists
 			final Path blobServerPath = new Path(storagePath, "blob");

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 0e97604..9e4f4b7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -47,6 +47,7 @@ import java.util.Random;
 import static org.apache.flink.runtime.blob.BlobCachePutTest.verifyDeletedEventually;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -113,15 +114,13 @@ public class BlobClientTest extends TestLogger {
 	 *
 	 * @param file
 	 * 		the file to prepare for the unit tests
-	 * @param blobType
-	 * 		whether the BLOB should become permanent or transient
 	 *
 	 * @return the BLOB key of the prepared file
 	 *
 	 * @throws IOException
 	 * 		thrown if an I/O error occurs while writing to the test file
 	 */
-	private static BlobKey prepareTestFile(File file, BlobKey.BlobType blobType) throws IOException {
+	private static byte[] prepareTestFile(File file) throws IOException {
 
 		MessageDigest md = BlobUtils.createMessageDigest();
 
@@ -145,7 +144,7 @@ public class BlobClientTest extends TestLogger {
 			}
 		}
 
-		return BlobKey.createKey(blobType, md.digest());
+		return md.digest();
 	}
 
 	/**
@@ -254,35 +253,38 @@ public class BlobClientTest extends TestLogger {
 			byte[] testBuffer = createTestBuffer();
 			MessageDigest md = BlobUtils.createMessageDigest();
 			md.update(testBuffer);
-			BlobKey origKey = BlobKey.createKey(blobType, md.digest());
+			byte[] digest = md.digest();
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
 			client = new BlobClient(serverAddress, getBlobClientConfig());
 
 			JobID jobId = new JobID();
-			BlobKey receivedKey;
 
 			// Store the data (job-unrelated)
+			BlobKey receivedKey1 = null;
 			if (blobType == TRANSIENT_BLOB) {
-				receivedKey = client.putBuffer(null, testBuffer, 0, testBuffer.length, blobType);
-				assertEquals(origKey, receivedKey);
+				receivedKey1 = client.putBuffer(null, testBuffer, 0, testBuffer.length, blobType);
+				assertArrayEquals(digest, receivedKey1.getHash());
 			}
 
 			// try again with a job-related BLOB:
-			receivedKey = client.putBuffer(jobId, testBuffer, 0, testBuffer.length, blobType);
-			assertEquals(origKey, receivedKey);
+			BlobKey receivedKey2 = client.putBuffer(jobId, testBuffer, 0, testBuffer.length, blobType);
+			assertArrayEquals(digest, receivedKey2.getHash());
+			if (blobType == TRANSIENT_BLOB) {
+				verifyKeyDifferentHashEquals(receivedKey1, receivedKey2);
+			}
 
 			// Retrieve the data (job-unrelated)
 			if (blobType == TRANSIENT_BLOB) {
-				validateGetAndClose(client.getInternal(null, receivedKey), testBuffer);
+				validateGetAndClose(client.getInternal(null, receivedKey1), testBuffer);
 				// transient BLOBs should be deleted from the server, eventually
-				verifyDeletedEventually(getBlobServer(), null, receivedKey);
+				verifyDeletedEventually(getBlobServer(), null, receivedKey1);
 			}
 			// job-related
-			validateGetAndClose(client.getInternal(jobId, receivedKey), testBuffer);
+			validateGetAndClose(client.getInternal(jobId, receivedKey2), testBuffer);
 			if (blobType == TRANSIENT_BLOB) {
 				// transient BLOBs should be deleted from the server, eventually
-				verifyDeletedEventually(getBlobServer(), jobId, receivedKey);
+				verifyDeletedEventually(getBlobServer(), jobId, receivedKey2);
 			}
 
 			// Check reaction to invalid keys for job-unrelated blobs
@@ -342,41 +344,42 @@ public class BlobClientTest extends TestLogger {
 			throws IOException, InterruptedException {
 
 		File testFile = temporaryFolder.newFile();
-		BlobKey origKey = prepareTestFile(testFile, blobType);
+		byte[] digest = prepareTestFile(testFile);
 
 		InputStream is = null;
 
 		try (BlobClient client = new BlobClient(new InetSocketAddress("localhost", getBlobServer().getPort()), getBlobClientConfig())) {
 
 			JobID jobId = new JobID();
-			BlobKey receivedKey;
+			BlobKey receivedKey1 = null;
 
 			// Store the data (job-unrelated)
 			if (blobType == TRANSIENT_BLOB) {
 				is = new FileInputStream(testFile);
-				receivedKey = client.putInputStream(null, is, blobType);
-				assertEquals(origKey, receivedKey);
+				receivedKey1 = client.putInputStream(null, is, blobType);
+				assertArrayEquals(digest, receivedKey1.getHash());
 			}
 
 			// try again with a job-related BLOB:
 			is = new FileInputStream(testFile);
-			receivedKey = client.putInputStream(jobId, is, blobType);
-			assertEquals(origKey, receivedKey);
+			BlobKey receivedKey2 = client.putInputStream(jobId, is, blobType);
 
 			is.close();
 			is = null;
 
 			// Retrieve the data (job-unrelated)
 			if (blobType == TRANSIENT_BLOB) {
-				validateGetAndClose(client.getInternal(null, receivedKey), testFile);
+				verifyKeyDifferentHashEquals(receivedKey1, receivedKey2);
+
+				validateGetAndClose(client.getInternal(null, receivedKey1), testFile);
 				// transient BLOBs should be deleted from the server, eventually
-				verifyDeletedEventually(getBlobServer(), null, receivedKey);
+				verifyDeletedEventually(getBlobServer(), null, receivedKey1);
 			}
 			// job-related
-			validateGetAndClose(client.getInternal(jobId, receivedKey), testFile);
+			validateGetAndClose(client.getInternal(jobId, receivedKey2), testFile);
 			if (blobType == TRANSIENT_BLOB) {
 				// transient BLOBs should be deleted from the server, eventually
-				verifyDeletedEventually(getBlobServer(), jobId, receivedKey);
+				verifyDeletedEventually(getBlobServer(), jobId, receivedKey2);
 			}
 		} finally {
 			if (is != null) {
@@ -463,7 +466,7 @@ public class BlobClientTest extends TestLogger {
 	static void uploadJarFile(BlobServer blobServer, Configuration blobClientConfig) throws Exception {
 		final File testFile = File.createTempFile("testfile", ".dat");
 		testFile.deleteOnExit();
-		prepareTestFile(testFile, PERMANENT_BLOB);
+		prepareTestFile(testFile);
 
 		InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
index 49f4fc2..ae538aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -29,14 +30,17 @@ import java.io.IOException;
 
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -46,21 +50,35 @@ public final class BlobKeyTest extends TestLogger {
 	/**
 	 * The first key array to be used during the unit tests.
 	 */
-	private static final byte[] KEY_ARRAY_1 = new byte[20];
+	private static final byte[] KEY_ARRAY_1 = new byte[BlobKey.SIZE];
 
 	/**
 	 * The second key array to be used during the unit tests.
 	 */
-	private static final byte[] KEY_ARRAY_2 = new byte[20];
+	private static final byte[] KEY_ARRAY_2 = new byte[BlobKey.SIZE];
+
+	/**
+	 * First byte array to use for the random component of a {@link BlobKey}.
+	 */
+	private static final byte[] RANDOM_ARRAY_1 = new byte[AbstractID.SIZE];
+
+	/**
+	 * Second byte array to use for the random component of a {@link BlobKey}.
+	 */
+	private static final byte[] RANDOM_ARRAY_2 = new byte[AbstractID.SIZE];
 
 	/*
-	 * Initialize the key array.
+	 * Initialize the key and random arrays.
 	 */
 	static {
 		for (int i = 0; i < KEY_ARRAY_1.length; ++i) {
 			KEY_ARRAY_1[i] = (byte) i;
 			KEY_ARRAY_2[i] = (byte) (i + 1);
 		}
+		for (int i = 0; i < RANDOM_ARRAY_1.length; ++i) {
+			RANDOM_ARRAY_1[i] = (byte) i;
+			RANDOM_ARRAY_2[i] = (byte) (i + 1);
+		}
 	}
 
 	@Test
@@ -89,7 +107,7 @@ public final class BlobKeyTest extends TestLogger {
 	 * Tests the serialization/deserialization of BLOB keys.
 	 */
 	private void testSerialization(BlobKey.BlobType blobType) throws Exception {
-		final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
+		final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_1);
 		final BlobKey k2 = CommonTestUtils.createCopySerializable(k1);
 		assertEquals(k1, k2);
 		assertEquals(k1.hashCode(), k2.hashCode());
@@ -107,16 +125,25 @@ public final class BlobKeyTest extends TestLogger {
 	}
 
 	/**
-	 * Tests the equals method.
+	 * Tests the {@link BlobKey#equals(Object)} and {@link BlobKey#hashCode()} methods.
 	 */
 	private void testEquals(BlobKey.BlobType blobType) {
-		final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
-		final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1);
-		final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2);
+		final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_1);
+		final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_1);
+		final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2, RANDOM_ARRAY_1);
+		final BlobKey k4 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_2);
 		assertTrue(k1.equals(k2));
 		assertTrue(k2.equals(k1));
+		assertEquals(k1.hashCode(), k2.hashCode());
 		assertFalse(k1.equals(k3));
 		assertFalse(k3.equals(k1));
+		assertFalse(k1.equals(k4));
+		assertFalse(k4.equals(k1));
+
+		//noinspection ObjectEqualsNull
+		assertFalse(k1.equals(null));
+		//noinspection EqualsBetweenInconvertibleTypes
+		assertFalse(k1.equals(this));
 	}
 
 	/**
@@ -124,8 +151,8 @@ public final class BlobKeyTest extends TestLogger {
 	 */
 	@Test
 	public void testEqualsDifferentBlobType() {
-		final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1);
-		final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1);
+		final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1, RANDOM_ARRAY_1);
+		final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1, RANDOM_ARRAY_1);
 		assertFalse(k1.equals(k2));
 		assertFalse(k2.equals(k1));
 	}
@@ -144,19 +171,22 @@ public final class BlobKeyTest extends TestLogger {
 	 * Tests the compares method.
 	 */
 	private void testCompares(BlobKey.BlobType blobType) {
-		final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
-		final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1);
-		final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2);
+		final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_1);
+		final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_1);
+		final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2, RANDOM_ARRAY_1);
+		final BlobKey k4 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_2);
 		assertThat(k1.compareTo(k2), is(0));
 		assertThat(k2.compareTo(k1), is(0));
 		assertThat(k1.compareTo(k3), lessThan(0));
+		assertThat(k1.compareTo(k4), lessThan(0));
 		assertThat(k3.compareTo(k1), greaterThan(0));
+		assertThat(k4.compareTo(k1), greaterThan(0));
 	}
 
 	@Test
 	public void testComparesDifferentBlobType() {
-		final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1);
-		final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1);
+		final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1, RANDOM_ARRAY_1);
+		final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1, RANDOM_ARRAY_1);
 		assertThat(k1.compareTo(k2), greaterThan(0));
 		assertThat(k2.compareTo(k1), lessThan(0));
 	}
@@ -175,7 +205,7 @@ public final class BlobKeyTest extends TestLogger {
 	 * Test the serialization/deserialization using input/output streams.
 	 */
 	private void testStreams(BlobKey.BlobType blobType) throws IOException {
-		final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
+		final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_1);
 		final ByteArrayOutputStream baos = new ByteArrayOutputStream(20);
 
 		k1.writeToOutputStream(baos);
@@ -188,6 +218,28 @@ public final class BlobKeyTest extends TestLogger {
 	}
 
 	/**
+	 * Verifies that the two given key's are different in total but share the same hash.
+	 *
+	 * @param key1 first blob key
+	 * @param key2 second blob key
+	 */
+	static void verifyKeyDifferentHashEquals(BlobKey key1, BlobKey key2) {
+		assertNotEquals(key1, key2);
+		assertThat(key1.getHash(), equalTo(key2.getHash()));
+	}
+
+	/**
+	 * Verifies that the two given key's are different in total and also have different hashes.
+	 *
+	 * @param key1 first blob key
+	 * @param key2 second blob key
+	 */
+	static void verifyKeyDifferentHashDifferent(BlobKey key1, BlobKey key2) {
+		assertNotEquals(key1, key2);
+		assertThat(key1.getHash(), not(equalTo(key2.getHash())));
+	}
+
+	/**
 	 * Verifies that the given <tt>key</tt> is of an expected type.
 	 *
 	 * @param expected the type the key should have

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index a110d4a..fde21ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -47,10 +47,11 @@ import java.util.concurrent.Executors;
 import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -86,6 +87,12 @@ public class BlobServerDeleteTest extends TestLogger {
 		testDeleteTransient(new JobID(), new JobID());
 	}
 
+	@Test
+	public void testDeleteTransient5() throws IOException {
+		JobID jobId = new JobID();
+		testDeleteTransient(jobId, jobId);
+	}
+
 	/**
 	 * Uploads a (different) byte array for each of the given jobs and verifies that deleting one of
 	 * them (via the {@link BlobServer}) does not influence the other.
@@ -97,7 +104,6 @@ public class BlobServerDeleteTest extends TestLogger {
 	 */
 	private void testDeleteTransient(@Nullable JobID jobId1, @Nullable JobID jobId2)
 			throws IOException {
-		final boolean sameJobId = (jobId1 == jobId2) || (jobId1 != null && jobId1.equals(jobId2));
 
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -118,7 +124,7 @@ public class BlobServerDeleteTest extends TestLogger {
 			// put two more BLOBs (same key, other key) for another job ID
 			TransientBlobKey key2a = (TransientBlobKey) put(server, jobId2, data, TRANSIENT_BLOB);
 			assertNotNull(key2a);
-			assertEquals(key1, key2a);
+			verifyKeyDifferentHashEquals(key1, key2a);
 			TransientBlobKey key2b = (TransientBlobKey) put(server, jobId2, data2, TRANSIENT_BLOB);
 			assertNotNull(key2b);
 
@@ -126,10 +132,9 @@ public class BlobServerDeleteTest extends TestLogger {
 			assertTrue(delete(server, jobId1, key1));
 
 			verifyDeleted(server, jobId1, key1);
-			// deleting a one BLOB should not affect another BLOB, even with the same key if job IDs are different
-			if (!sameJobId) {
-				verifyContents(server, jobId2, key2a, data);
-			}
+			// deleting a one BLOB should not affect another BLOB with a different key
+			// (and keys are always different now)
+			verifyContents(server, jobId2, key2a, data);
 			verifyContents(server, jobId2, key2b, data2);
 
 			// delete first file of second job
@@ -284,7 +289,7 @@ public class BlobServerDeleteTest extends TestLogger {
 
 			BlobKey key1a = put(server, jobId1, data, blobType);
 			BlobKey key2 = put(server, jobId2, data, blobType);
-			assertEquals(key1a, key2);
+			assertArrayEquals(key1a.getHash(), key2.getHash());
 
 			BlobKey key1b = put(server, jobId1, data2, blobType);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 4927279..e3b5309 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -42,7 +42,6 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.file.AccessDeniedException;
 import java.nio.file.NoSuchFileException;
-import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -58,6 +57,7 @@ import java.util.concurrent.Executors;
 import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobUtils.JOB_DIR_PREFIX;
 import static org.junit.Assert.assertArrayEquals;
@@ -141,18 +141,18 @@ public class BlobServerGetTest extends TestLogger {
 
 			// add the same data under a second jobId
 			BlobKey key2 = put(server, jobId2, data, blobType);
-			assertNotNull(key);
-			assertEquals(key, key2);
+			assertNotNull(key2);
+			verifyKeyDifferentHashEquals(key, key2);
 
 			// request for jobId2 should succeed
-			get(server, jobId2, key);
+			get(server, jobId2, key2);
 			// request for jobId1 should still fail
 			verifyDeleted(server, jobId1, key);
 
 			// same checks as for jobId1 but for jobId2 should also work:
-			blobFile = server.getStorageLocation(jobId2, key);
+			blobFile = server.getStorageLocation(jobId2, key2);
 			assertTrue(blobFile.delete());
-			verifyDeleted(server, jobId2, key);
+			verifyDeleted(server, jobId2, key2);
 		}
 	}
 
@@ -373,11 +373,6 @@ public class BlobServerGetTest extends TestLogger {
 
 		final byte[] data = {1, 2, 3, 4, 99, 42};
 
-		MessageDigest md = BlobUtils.createMessageDigest();
-
-		// create the correct blob key by hashing our input data
-		final BlobKey blobKey = BlobKey.createKey(blobType, md.digest(data));
-
 		doAnswer(
 			new Answer() {
 				@Override
@@ -398,7 +393,7 @@ public class BlobServerGetTest extends TestLogger {
 			server.start();
 
 			// upload data first
-			assertEquals(blobKey, put(server, jobId, data, blobType));
+			final BlobKey blobKey = put(server, jobId, data, blobType);
 
 			// now try accessing it concurrently (only HA mode will be able to retrieve it from HA store!)
 			if (blobType == PERMANENT_BLOB) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index aefd0a3..dcf49a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -62,6 +62,7 @@ import java.util.concurrent.Executors;
 import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -226,21 +227,26 @@ public class BlobServerPutTest extends TestLogger {
 			// put data for jobId1 and verify
 			BlobKey key1a = put(server, jobId1, data, blobType);
 			assertNotNull(key1a);
+			// second upload of same data should yield a different BlobKey
+			BlobKey key1a2 = put(server, jobId1, data, blobType);
+			assertNotNull(key1a2);
+			verifyKeyDifferentHashEquals(key1a, key1a2);
 
 			BlobKey key1b = put(server, jobId1, data2, blobType);
 			assertNotNull(key1b);
 
 			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1a2, data);
 			verifyContents(server, jobId1, key1b, data2);
 
 			// now put data for jobId2 and verify that both are ok
 			BlobKey key2a = put(server, jobId2, data, blobType);
 			assertNotNull(key2a);
-			assertEquals(key1a, key2a);
+			verifyKeyDifferentHashEquals(key1a, key2a);
 
 			BlobKey key2b = put(server, jobId2, data2, blobType);
 			assertNotNull(key2b);
-			assertEquals(key1b, key2b);
+			verifyKeyDifferentHashEquals(key1b, key2b);
 
 			// verify the accessibility and the BLOB contents
 			verifyContents(server, jobId2, key2a, data);
@@ -249,6 +255,7 @@ public class BlobServerPutTest extends TestLogger {
 			// verify the accessibility and the BLOB contents one more time (transient BLOBs should
 			// not be deleted here)
 			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1a2, data);
 			verifyContents(server, jobId1, key1b, data2);
 			verifyContents(server, jobId2, key2a, data);
 			verifyContents(server, jobId2, key2b, data2);
@@ -311,21 +318,26 @@ public class BlobServerPutTest extends TestLogger {
 			// put data for jobId1 and verify
 			BlobKey key1a = put(server, jobId1, new ByteArrayInputStream(data), blobType);
 			assertNotNull(key1a);
+			// second upload of same data should yield a different BlobKey
+			BlobKey key1a2 = put(server, jobId1, new ByteArrayInputStream(data), blobType);
+			assertNotNull(key1a2);
+			verifyKeyDifferentHashEquals(key1a, key1a2);
 
 			BlobKey key1b = put(server, jobId1, new ByteArrayInputStream(data2), blobType);
 			assertNotNull(key1b);
 
 			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1a2, data);
 			verifyContents(server, jobId1, key1b, data2);
 
 			// now put data for jobId2 and verify that both are ok
 			BlobKey key2a = put(server, jobId2, new ByteArrayInputStream(data), blobType);
 			assertNotNull(key2a);
-			assertEquals(key1a, key2a);
+			verifyKeyDifferentHashEquals(key1a, key2a);
 
 			BlobKey key2b = put(server, jobId2, new ByteArrayInputStream(data2), blobType);
 			assertNotNull(key2b);
-			assertEquals(key1b, key2b);
+			verifyKeyDifferentHashEquals(key1b, key2b);
 
 			// verify the accessibility and the BLOB contents
 			verifyContents(server, jobId2, key2a, data);
@@ -334,6 +346,7 @@ public class BlobServerPutTest extends TestLogger {
 			// verify the accessibility and the BLOB contents one more time (transient BLOBs should
 			// not be deleted here)
 			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1a2, data);
 			verifyContents(server, jobId1, key1b, data2);
 			verifyContents(server, jobId2, key2a, data);
 			verifyContents(server, jobId2, key2b, data2);
@@ -396,21 +409,26 @@ public class BlobServerPutTest extends TestLogger {
 			// put data for jobId1 and verify
 			BlobKey key1a = put(server, jobId1, new ChunkedInputStream(data, 19), blobType);
 			assertNotNull(key1a);
+			// second upload of same data should yield a different BlobKey
+			BlobKey key1a2 = put(server, jobId1, new ChunkedInputStream(data, 19), blobType);
+			assertNotNull(key1a2);
+			verifyKeyDifferentHashEquals(key1a, key1a2);
 
 			BlobKey key1b = put(server, jobId1, new ChunkedInputStream(data2, 19), blobType);
 			assertNotNull(key1b);
 
 			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1a2, data);
 			verifyContents(server, jobId1, key1b, data2);
 
 			// now put data for jobId2 and verify that both are ok
 			BlobKey key2a = put(server, jobId2, new ChunkedInputStream(data, 19), blobType);
 			assertNotNull(key2a);
-			assertEquals(key1a, key2a);
+			verifyKeyDifferentHashEquals(key1a, key2a);
 
 			BlobKey key2b = put(server, jobId2, new ChunkedInputStream(data2, 19), blobType);
 			assertNotNull(key2b);
-			assertEquals(key1b, key2b);
+			verifyKeyDifferentHashEquals(key1b, key2b);
 
 			// verify the accessibility and the BLOB contents
 			verifyContents(server, jobId2, key2a, data);
@@ -419,6 +437,7 @@ public class BlobServerPutTest extends TestLogger {
 			// verify the accessibility and the BLOB contents one more time (transient BLOBs should
 			// not be deleted here)
 			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1a2, data);
 			verifyContents(server, jobId1, key1b, data2);
 			verifyContents(server, jobId2, key2a, data);
 			verifyContents(server, jobId2, key2b, data2);
@@ -700,7 +719,7 @@ public class BlobServerPutTest extends TestLogger {
 
 			// make sure that all blob keys are the same
 			while (blobKeyIterator.hasNext()) {
-				assertEquals(blobKey, blobKeyIterator.next());
+				verifyKeyDifferentHashEquals(blobKey, blobKeyIterator.next());
 			}
 
 			// check the uploaded file's contents

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
index 35575b5..5b8d0e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
@@ -39,12 +39,10 @@ import java.util.Random;
 
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -126,8 +124,7 @@ public class BlobServerRecoveryTest extends TestLogger {
 
 			// put non-HA data
 			nonHAKey = put(server0, jobId[0], expected2, TRANSIENT_BLOB);
-			assertNotEquals(keys[1], nonHAKey);
-			assertThat(keys[1].getHash(), equalTo(nonHAKey.getHash()));
+			verifyKeyDifferentHashEquals(keys[1], nonHAKey);
 
 			// check that the storage directory exists
 			final Path blobServerPath = new Path(storagePath, "blob");