You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/16 07:28:40 UTC
flink git commit: [FLINK-7140][blob] add an additional random
component into the BlobKey
Repository: flink
Updated Branches:
refs/heads/master b2b94632d -> f853f3359
[FLINK-7140][blob] add an additional random component into the BlobKey
This should guard us from uploading (and deleting) the same file more than
once and also from hash collisions.
This closes #4359.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f853f335
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f853f335
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f853f335
Branch: refs/heads/master
Commit: f853f33593373f75a72351f5256564533bc063f9
Parents: b2b9463
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Oct 5 09:33:27 2017 +0200
Committer: Till <ti...@gmail.com>
Committed: Mon Oct 16 09:23:37 2017 +0200
----------------------------------------------------------------------
.../flink/runtime/blob/AbstractBlobCache.java | 18 +++-
.../apache/flink/runtime/blob/BlobClient.java | 12 ++-
.../org/apache/flink/runtime/blob/BlobKey.java | 100 +++++++++++++++++--
.../apache/flink/runtime/blob/BlobServer.java | 66 +++++++++---
.../runtime/blob/BlobServerConnection.java | 18 ++--
.../apache/flink/runtime/blob/BlobUtils.java | 12 +--
.../flink/runtime/blob/PermanentBlobKey.java | 12 +++
.../flink/runtime/blob/TransientBlobKey.java | 12 +++
.../handler/legacy/TaskManagerLogHandler.java | 3 +
.../flink/runtime/blob/BlobCacheDeleteTest.java | 24 +++--
.../flink/runtime/blob/BlobCacheGetTest.java | 33 +++---
.../flink/runtime/blob/BlobCachePutTest.java | 37 +++++--
.../runtime/blob/BlobCacheRecoveryTest.java | 12 +--
.../flink/runtime/blob/BlobClientTest.java | 53 +++++-----
.../apache/flink/runtime/blob/BlobKeyTest.java | 84 +++++++++++++---
.../runtime/blob/BlobServerDeleteTest.java | 21 ++--
.../flink/runtime/blob/BlobServerGetTest.java | 19 ++--
.../flink/runtime/blob/BlobServerPutTest.java | 33 ++++--
.../runtime/blob/BlobServerRecoveryTest.java | 7 +-
19 files changed, 405 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
index dc031e0..729ac9a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
@@ -159,8 +159,13 @@ public abstract class AbstractBlobCache implements Closeable {
try {
if (blobView.get(jobId, blobKey, incomingFile)) {
// now move the temp file to our local cache atomically
- BlobUtils.moveTempFileToStore(
- incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), log, null);
+ readWriteLock.writeLock().lock();
+ try {
+ BlobUtils.moveTempFileToStore(
+ incomingFile, jobId, blobKey, localFile, log, null);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
return localFile;
}
@@ -172,8 +177,13 @@ public abstract class AbstractBlobCache implements Closeable {
BlobClient.downloadFromBlobServer(
jobId, blobKey, incomingFile, serverAddress, blobClientConfig, numFetchRetries);
- BlobUtils.moveTempFileToStore(
- incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), log, null);
+ readWriteLock.writeLock().lock();
+ try {
+ BlobUtils.moveTempFileToStore(
+ incomingFile, jobId, blobKey, localFile, log, null);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
return localFile;
} finally {
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 3154f69..fbcce58 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -47,9 +47,11 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.security.MessageDigest;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT;
@@ -57,7 +59,6 @@ import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_UNRELATED_CON
import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
-import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobUtils.readFully;
import static org.apache.flink.runtime.blob.BlobUtils.readLength;
import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
@@ -496,13 +497,16 @@ public final class BlobClient implements Closeable {
else if (response == RETURN_OKAY) {
BlobKey remoteKey = BlobKey.readFromInputStream(is);
- BlobKey localKey = BlobKey.createKey(blobType, md.digest());
+ byte[] localHash = md.digest();
- if (!localKey.equals(remoteKey)) {
+ if (blobType != remoteKey.getType()) {
+ throw new IOException("Detected data corruption during transfer");
+ }
+ if (!Arrays.equals(localHash, remoteKey.getHash())) {
throw new IOException("Detected data corruption during transfer");
}
- return localKey;
+ return remoteKey;
}
else if (response == RETURN_ERROR) {
Throwable cause = readExceptionFromStream(is);
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
index 0aa45e1..ef2d64d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.blob;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.AbstractID;
import org.apache.flink.util.StringUtils;
import java.io.EOFException;
@@ -41,7 +42,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
private static final long serialVersionUID = 3847117712521785209L;
/** Size of the internal BLOB key in bytes. */
- private static final int SIZE = 20;
+ public static final int SIZE = 20;
/** The byte buffer storing the actual key data. */
private final byte[] key;
@@ -68,6 +69,11 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
}
/**
+ * Random component of the key.
+ */
+ private final AbstractID random;
+
+ /**
* Constructs a new BLOB key.
*
* @param type
@@ -76,6 +82,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
protected BlobKey(BlobType type) {
this.type = checkNotNull(type);
this.key = new byte[SIZE];
+ this.random = new AbstractID();
}
/**
@@ -87,13 +94,33 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
* the actual key data
*/
protected BlobKey(BlobType type, byte[] key) {
+ if (key == null || key.length != SIZE) {
+ throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
+ }
+
this.type = checkNotNull(type);
+ this.key = key;
+ this.random = new AbstractID();
+ }
+ /**
+ * Constructs a new BLOB key from the given byte array.
+ *
+ * @param type
+ * whether the referenced BLOB is permanent or transient
+ * @param key
+ * the actual key data
+ * @param random
+ * the random component of the key
+ */
+ protected BlobKey(BlobType type, byte[] key, byte[] random) {
if (key == null || key.length != SIZE) {
throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
}
+ this.type = checkNotNull(type);
this.key = key;
+ this.random = new AbstractID(random);
}
/**
@@ -107,10 +134,10 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
@VisibleForTesting
static BlobKey createKey(BlobType type) {
if (type == PERMANENT_BLOB) {
- return new PermanentBlobKey();
- } else {
+ return new PermanentBlobKey();
+ } else {
return new TransientBlobKey();
- }
+ }
}
/**
@@ -125,10 +152,30 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
*/
static BlobKey createKey(BlobType type, byte[] key) {
if (type == PERMANENT_BLOB) {
- return new PermanentBlobKey(key);
- } else {
+ return new PermanentBlobKey(key);
+ } else {
return new TransientBlobKey(key);
- }
+ }
+ }
+
+ /**
+ * Returns the right {@link BlobKey} subclass for the given parameters.
+ *
+ * @param type
+ * whether the referenced BLOB is permanent or transient
+ * @param key
+ * the actual key data
+ * @param random
+ * the random component of the key
+ *
+ * @return BlobKey subclass
+ */
+ static BlobKey createKey(BlobType type, byte[] key, byte[] random) {
+ if (type == PERMANENT_BLOB) {
+ return new PermanentBlobKey(key, random);
+ } else {
+ return new TransientBlobKey(key, random);
+ }
}
/**
@@ -141,6 +188,15 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
}
/**
+ * Returns the (internal) BLOB type which is reflected by the inheriting sub-class.
+ *
+ * @return BLOB type, i.e. permanent or transient
+ */
+ BlobType getType() {
+ return type;
+ }
+
+ /**
* Adds the BLOB key to the given {@link MessageDigest}.
*
* @param md
@@ -159,13 +215,16 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
final BlobKey bk = (BlobKey) obj;
- return Arrays.equals(this.key, bk.key) && this.type == bk.type;
+ return Arrays.equals(this.key, bk.key) &&
+ this.type == bk.type &&
+ this.random.equals(bk.random);
}
@Override
public int hashCode() {
int result = Arrays.hashCode(this.key);
result = 37 * result + this.type.hashCode();
+ result = 37 * result + this.random.hashCode();
return result;
}
@@ -183,7 +242,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
// this actually never happens!
throw new IllegalStateException("Invalid BLOB type");
}
- return typeString + StringUtils.byteToHexString(this.key);
+ return typeString + StringUtils.byteToHexString(this.key) + "-" + random.toString();
}
@Override
@@ -203,7 +262,13 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
if (aarr.length == barr.length) {
// same hash contents - compare the BLOB types
- return this.type.compareTo(o.type);
+ int typeCompare = this.type.compareTo(o.type);
+ if (typeCompare == 0) {
+ // same type - compare random components
+ return this.random.compareTo(o.random);
+ } else {
+ return typeCompare;
+ }
} else {
return aarr.length - barr.length;
}
@@ -223,6 +288,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
static BlobKey readFromInputStream(InputStream inputStream) throws IOException {
final byte[] key = new byte[BlobKey.SIZE];
+ final byte[] random = new byte[AbstractID.SIZE];
int bytesRead = 0;
// read key
@@ -233,6 +299,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
}
bytesRead += read;
}
+
// read BLOB type
final BlobType blobType;
{
@@ -248,7 +315,17 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
}
}
- return createKey(blobType, key);
+ // read random component
+ bytesRead = 0;
+ while (bytesRead < AbstractID.SIZE) {
+ final int read = inputStream.read(random, bytesRead, AbstractID.SIZE - bytesRead);
+ if (read < 0) {
+ throw new EOFException("Read an incomplete BLOB key");
+ }
+ bytesRead += read;
+ }
+
+ return createKey(blobType, key, random);
}
/**
@@ -262,5 +339,6 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
void writeToOutputStream(final OutputStream outputStream) throws IOException {
outputStream.write(this.key);
outputStream.write(this.type.ordinal());
+ outputStream.write(this.random.getBytes());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 7804dfd..bc61ef7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -474,8 +474,13 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
incomingFile = createTemporaryFilename();
blobStore.get(jobId, blobKey, incomingFile);
- BlobUtils.moveTempFileToStore(
- incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null);
+ readWriteLock.writeLock().lock();
+ try {
+ BlobUtils.moveTempFileToStore(
+ incomingFile, jobId, blobKey, localFile, LOG, null);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
return;
} finally {
@@ -586,10 +591,8 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
md.update(value);
fos.write(value);
- blobKey = BlobKey.createKey(blobType, md.digest());
-
// persist file
- moveTempFileToStore(incomingFile, jobId, blobKey);
+ blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);
return blobKey;
} finally {
@@ -642,10 +645,8 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
md.update(buf, 0, bytesRead);
}
- blobKey = BlobKey.createKey(blobType, md.digest());
-
// persist file
- moveTempFileToStore(incomingFile, jobId, blobKey);
+ blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);
return blobKey;
} finally {
@@ -665,20 +666,53 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
* temporary file created during transfer
* @param jobId
* ID of the job this blob belongs to or <tt>null</tt> if job-unrelated
- * @param blobKey
- * BLOB key identifying the file
+ * @param digest
+ * BLOB content digest, i.e. hash
+ * @param blobType
+ * whether this file is a permanent or transient BLOB
+ *
+ * @return unique BLOB key that identifies the BLOB on the server
*
* @throws IOException
* thrown if an I/O error occurs while moving the file or uploading it to the HA store
*/
- void moveTempFileToStore(
- File incomingFile, @Nullable JobID jobId, BlobKey blobKey) throws IOException {
+ BlobKey moveTempFileToStore(
+ File incomingFile, @Nullable JobID jobId, byte[] digest, BlobKey.BlobType blobType)
+ throws IOException {
+
+ int retries = 10;
- File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
+ int attempt = 0;
+ while (true) {
+ // add unique component independent of the BLOB content
+ BlobKey blobKey = BlobKey.createKey(blobType, digest);
+ File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
- BlobUtils.moveTempFileToStore(
- incomingFile, jobId, blobKey, storageFile, readWriteLock.writeLock(), LOG,
- blobKey instanceof PermanentBlobKey ? blobStore : null);
+ // try again until the key is unique (put the existence check into the lock!)
+ readWriteLock.writeLock().lock();
+ try {
+ if (!storageFile.exists()) {
+ BlobUtils.moveTempFileToStore(
+ incomingFile, jobId, blobKey, storageFile, LOG,
+ blobKey instanceof PermanentBlobKey ? blobStore : null);
+ return blobKey;
+ }
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+
+ ++attempt;
+ if (attempt >= retries) {
+ String message = "Failed to find a unique key for BLOB of job " + jobId + " (last tried " + storageFile.getAbsolutePath() + ".";
+ LOG.error(message + " No retries left.");
+ throw new IOException(message);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to find a unique key for BLOB of job {} (retry {}, last tried {})",
+ jobId, attempt, storageFile.getAbsolutePath());
+ }
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index be62581..fa8427e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -37,6 +37,8 @@ import java.security.MessageDigest;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT;
@@ -44,8 +46,6 @@ import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_UNRELATED_CON
import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
-import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
-import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
import static org.apache.flink.runtime.blob.BlobUtils.closeSilently;
import static org.apache.flink.runtime.blob.BlobUtils.readFully;
import static org.apache.flink.runtime.blob.BlobUtils.readLength;
@@ -346,9 +346,9 @@ class BlobServerConnection extends Thread {
}
incomingFile = blobServer.createTemporaryFilename();
- BlobKey blobKey = readFileFully(inputStream, incomingFile, buf, blobType);
+ byte[] digest = readFileFully(inputStream, incomingFile, buf);
- blobServer.moveTempFileToStore(incomingFile, jobId, blobKey);
+ BlobKey blobKey = blobServer.moveTempFileToStore(incomingFile, jobId, digest, blobType);
// Return computed key to client for validation
outputStream.write(RETURN_OKAY);
@@ -387,16 +387,14 @@ class BlobServerConnection extends Thread {
* file to write to
* @param buf
* An auxiliary buffer for data serialization/deserialization
- * @param blobType
- * whether to make the data permanent or transient
*
- * @return the received file's content hash as a BLOB key
+ * @return the received file's content hash
*
* @throws IOException
* thrown if an I/O error occurs while reading/writing data from/to the respective streams
*/
- private static BlobKey readFileFully(
- final InputStream inputStream, final File incomingFile, final byte[] buf, BlobKey.BlobType blobType)
+ private static byte[] readFileFully(
+ final InputStream inputStream, final File incomingFile, final byte[] buf)
throws IOException {
MessageDigest md = BlobUtils.createMessageDigest();
@@ -417,7 +415,7 @@ class BlobServerConnection extends Thread {
md.update(buf, 0, bytesExpected);
}
- return BlobKey.createKey(blobType, md.digest());
+ return md.digest();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index d8223c8..04f2cdb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -30,6 +30,7 @@ import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import javax.annotation.Nullable;
+
import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
@@ -42,7 +43,6 @@ import java.nio.file.Files;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.UUID;
-import java.util.concurrent.locks.Lock;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
@@ -422,7 +422,7 @@ public class BlobUtils {
/**
* Moves the temporary <tt>incomingFile</tt> to its permanent location where it is available for
- * use.
+ * use (not thread-safe!).
*
* @param incomingFile
* temporary file created during transfer
@@ -432,8 +432,6 @@ public class BlobUtils {
* BLOB key identifying the file
* @param storageFile
* (local) file where the blob is/should be stored
- * @param writeLock
- * lock to acquire before doing the move
* @param log
* logger for debug information
* @param blobStore
@@ -444,9 +442,7 @@ public class BlobUtils {
*/
static void moveTempFileToStore(
File incomingFile, @Nullable JobID jobId, BlobKey blobKey, File storageFile,
- Lock writeLock, Logger log, @Nullable BlobStore blobStore) throws IOException {
-
- writeLock.lock();
+ Logger log, @Nullable BlobStore blobStore) throws IOException {
try {
// first check whether the file already exists
@@ -483,8 +479,6 @@ public class BlobUtils {
if (incomingFile != null && !incomingFile.delete() && incomingFile.exists()) {
log.warn("Could not delete the staging file {} for blob key {} and job {}.", incomingFile, blobKey, jobId);
}
-
- writeLock.unlock();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java
index 2ad8f72..40732fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java
@@ -42,4 +42,16 @@ public final class PermanentBlobKey extends BlobKey {
PermanentBlobKey(byte[] key) {
super(BlobType.PERMANENT_BLOB, key);
}
+
+ /**
+ * Constructs a new BLOB key from the given byte array.
+ *
+ * @param key
+ * the actual key data
+ * @param random
+ * the random component of the key
+ */
+ PermanentBlobKey(byte[] key, byte[] random) {
+ super(BlobType.PERMANENT_BLOB, key, random);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java
index 43e0f5f..15a9637 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java
@@ -42,4 +42,16 @@ public final class TransientBlobKey extends BlobKey {
TransientBlobKey(byte[] key) {
super(BlobType.TRANSIENT_BLOB, key);
}
+
+ /**
+ * Constructs a new BLOB key from the given byte array.
+ *
+ * @param key
+ * the actual key data
+ * @param random
+ * the random component of the key
+ */
+ TransientBlobKey(byte[] key, byte[] random) {
+ super(BlobType.TRANSIENT_BLOB, key, random);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
index 54725e1..cf5bfcb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
@@ -190,6 +190,9 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
//delete previous log file, if it is different than the current one
HashMap<String, TransientBlobKey> lastSubmittedFile = fileMode == FileMode.LOG ? lastSubmittedLog : lastSubmittedStdout;
if (lastSubmittedFile.containsKey(taskManagerID)) {
+ // the BlobKey will almost certainly be different but the old file
+ // may not exist anymore so we cannot rely on it and need to
+ // download the new file anyway, even if the hashes match
if (!Objects.equals(blobKey, lastSubmittedFile.get(taskManagerID))) {
if (!blobCache.deleteFromCache(lastSubmittedFile.get(taskManagerID))) {
throw new CompletionException(new FlinkException("Could not delete file for " + taskManagerID + '.'));
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
index a83d100..16ba020 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
@@ -51,7 +51,6 @@ import static org.apache.flink.runtime.blob.BlobServerDeleteTest.delete;
import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -87,6 +86,12 @@ public class BlobCacheDeleteTest extends TestLogger {
testDelete(new JobID(), new JobID());
}
+ @Test
+ public void testDeleteTransient5() throws IOException {
+ JobID jobId = new JobID();
+ testDelete(jobId, jobId);
+ }
+
/**
* Uploads a (different) byte array for each of the given jobs and verifies that deleting one of
* them (via the {@link BlobCacheService}) does not influence the other.
@@ -98,7 +103,6 @@ public class BlobCacheDeleteTest extends TestLogger {
*/
private void testDelete(@Nullable JobID jobId1, @Nullable JobID jobId2)
throws IOException {
- final boolean sameJobId = (jobId1 == jobId2) || (jobId1 != null && jobId1.equals(jobId2));
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -122,9 +126,10 @@ public class BlobCacheDeleteTest extends TestLogger {
// put two more BLOBs (same key, other key) for another job ID
TransientBlobKey key2a = (TransientBlobKey) put(server, jobId2, data, TRANSIENT_BLOB);
assertNotNull(key2a);
- assertEquals(key1, key2a);
+ BlobKeyTest.verifyKeyDifferentHashEquals(key1, key2a);
TransientBlobKey key2b = (TransientBlobKey) put(server, jobId2, data2, TRANSIENT_BLOB);
assertNotNull(key2b);
+ BlobKeyTest.verifyKeyDifferentHashDifferent(key1, key2b);
// issue a DELETE request
assertTrue(delete(cache, jobId1, key1));
@@ -134,16 +139,15 @@ public class BlobCacheDeleteTest extends TestLogger {
// delete on server so that the cache cannot re-download
assertTrue(server.deleteInternal(jobId1, key1));
verifyDeleted(cache, jobId1, key1);
- // deleting a one BLOB should not affect another BLOB, even with the same key if job IDs are different
- if (!sameJobId) {
- verifyContents(server, jobId2, key2a, data);
- }
+ // deleting one BLOB should not affect another BLOB with a different key
+ // (and keys are always different now)
+ verifyContents(server, jobId2, key2a, data);
verifyContents(server, jobId2, key2b, data2);
// delete first file of second job
assertTrue(delete(cache, jobId2, key2a));
- // delete only works on local cache (unless already deleted - key1 == key2a)!
- assertTrue(sameJobId || server.getStorageLocation(jobId2, key2a).exists());
+ // delete only works on local cache
+ assertTrue(server.getStorageLocation(jobId2, key2a).exists());
// delete on server so that the cache cannot re-download
assertTrue(server.deleteInternal(jobId2, key2a));
verifyDeleted(cache, jobId2, key2a);
@@ -151,7 +155,7 @@ public class BlobCacheDeleteTest extends TestLogger {
// delete second file of second job
assertTrue(delete(cache, jobId2, key2b));
- // delete only works on local cache (unless already deleted - key1 == key2a)!
+ // delete only works on local cache
assertTrue(server.getStorageLocation(jobId2, key2b).exists());
// delete on server so that the cache cannot re-download
assertTrue(server.deleteInternal(jobId2, key2b));
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
index bed27d8..c760d04 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
@@ -40,7 +40,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.AccessDeniedException;
-import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -57,6 +56,7 @@ import static org.apache.flink.runtime.blob.BlobCachePutTest.verifyDeletedEventu
import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
import static org.apache.flink.runtime.blob.BlobKeyTest.verifyType;
import static org.apache.flink.runtime.blob.BlobServerDeleteTest.delete;
import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
@@ -152,37 +152,37 @@ public class BlobCacheGetTest extends TestLogger {
// add the same data under a second jobId
BlobKey key2 = put(server, jobId2, data, blobType);
- assertNotNull(key);
- assertEquals(key, key2);
+ assertNotNull(key2);
+ verifyKeyDifferentHashEquals(key, key2);
// request for jobId2 should succeed
- get(cache, jobId2, key);
+ get(cache, jobId2, key2);
// request for jobId1 should still fail
verifyDeleted(cache, jobId1, key);
if (blobType == PERMANENT_BLOB) {
// still existing on server
- assertTrue(server.getStorageLocation(jobId2, key).exists());
+ assertTrue(server.getStorageLocation(jobId2, key2).exists());
// delete jobId2 on cache
- blobFile = cache.getPermanentBlobService().getStorageLocation(jobId2, key);
+ blobFile = cache.getPermanentBlobService().getStorageLocation(jobId2, key2);
assertTrue(blobFile.delete());
// try to retrieve again
- get(cache, jobId2, key);
+ get(cache, jobId2, key2);
// delete on cache and server, verify that it is not accessible anymore
- blobFile = cache.getPermanentBlobService().getStorageLocation(jobId2, key);
+ blobFile = cache.getPermanentBlobService().getStorageLocation(jobId2, key2);
assertTrue(blobFile.delete());
- blobFile = server.getStorageLocation(jobId2, key);
+ blobFile = server.getStorageLocation(jobId2, key2);
assertTrue(blobFile.delete());
- verifyDeleted(cache, jobId2, key);
+ verifyDeleted(cache, jobId2, key2);
} else {
// deleted eventually on the server by the GET request above
- verifyDeletedEventually(server, jobId2, key);
+ verifyDeletedEventually(server, jobId2, key2);
// delete jobId2 on cache
- blobFile = cache.getTransientBlobService().getStorageLocation(jobId2, key);
+ blobFile = cache.getTransientBlobService().getStorageLocation(jobId2, key2);
assertTrue(blobFile.delete());
// verify that it is not accessible anymore
- verifyDeleted(cache, jobId2, key);
+ verifyDeleted(cache, jobId2, key2);
}
}
}
@@ -548,11 +548,6 @@ public class BlobCacheGetTest extends TestLogger {
final byte[] data = {1, 2, 3, 4, 99, 42};
- MessageDigest md = BlobUtils.createMessageDigest();
-
- // create the correct blob key by hashing our input data
- final BlobKey blobKey = BlobKey.createKey(blobType, md.digest(data));
-
final ExecutorService executor = Executors.newFixedThreadPool(numberConcurrentGetOperations);
try (
@@ -563,7 +558,7 @@ public class BlobCacheGetTest extends TestLogger {
server.start();
// upload data first
- assertEquals(blobKey, put(server, jobId, data, blobType));
+ final BlobKey blobKey = put(server, jobId, data, blobType);
// now try accessing it concurrently (only HA mode will be able to retrieve it from HA store!)
for (int i = 0; i < numberConcurrentGetOperations; i++) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
index aa23c80..56258c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
@@ -58,6 +58,7 @@ import java.util.function.Supplier;
import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
import static org.apache.flink.runtime.blob.BlobKeyTest.verifyType;
import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
import static org.apache.flink.runtime.blob.BlobServerPutTest.BlockingInputStream;
@@ -275,6 +276,11 @@ public class BlobCachePutTest extends TestLogger {
BlobKey key1a = put(cache, jobId1, data, blobType);
assertNotNull(key1a);
verifyType(blobType, key1a);
+ // second upload of same data should yield a different BlobKey
+ BlobKey key1a2 = put(cache, jobId1, data, blobType);
+ assertNotNull(key1a2);
+ verifyType(blobType, key1a2);
+ verifyKeyDifferentHashEquals(key1a, key1a2);
BlobKey key1b = put(cache, jobId1, data2, blobType);
assertNotNull(key1b);
@@ -282,19 +288,23 @@ public class BlobCachePutTest extends TestLogger {
// files should be available on the server
verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1a2, data);
verifyContents(server, jobId1, key1b, data2);
// now put data for jobId2 and verify that both are ok
BlobKey key2a = put(cache, jobId2, data, blobType);
assertNotNull(key2a);
- assertEquals(key1a, key2a);
+ verifyType(blobType, key2a);
+ verifyKeyDifferentHashEquals(key1a, key2a);
BlobKey key2b = put(cache, jobId2, data2, blobType);
assertNotNull(key2b);
- assertEquals(key1b, key2b);
+ verifyType(blobType, key2b);
+ verifyKeyDifferentHashEquals(key1b, key2b);
// verify the accessibility and the BLOB contents
verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1a2, data);
verifyContents(server, jobId1, key1b, data2);
verifyContents(server, jobId2, key2a, data);
verifyContents(server, jobId2, key2b, data2);
@@ -382,26 +392,32 @@ public class BlobCachePutTest extends TestLogger {
TransientBlobKey key1a =
(TransientBlobKey) put(cache, jobId1, new ByteArrayInputStream(data), TRANSIENT_BLOB);
assertNotNull(key1a);
+ // second upload of same data should yield a different BlobKey
+ BlobKey key1a2 = put(cache, jobId1, new ByteArrayInputStream(data), TRANSIENT_BLOB);
+ assertNotNull(key1a2);
+ verifyKeyDifferentHashEquals(key1a, key1a2);
TransientBlobKey key1b = (TransientBlobKey) put(cache, jobId1, new ByteArrayInputStream(data2), TRANSIENT_BLOB);
assertNotNull(key1b);
// files should be available on the server
verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1a2, data);
verifyContents(server, jobId1, key1b, data2);
// now put data for jobId2 and verify that both are ok
TransientBlobKey key2a =
(TransientBlobKey) put(cache, jobId2, new ByteArrayInputStream(data), TRANSIENT_BLOB);
assertNotNull(key2a);
- assertEquals(key1a, key2a);
+ verifyKeyDifferentHashEquals(key1a, key2a);
TransientBlobKey key2b = (TransientBlobKey) put(cache, jobId2, new ByteArrayInputStream(data2), TRANSIENT_BLOB);
assertNotNull(key2b);
- assertEquals(key1b, key2b);
+ verifyKeyDifferentHashEquals(key1b, key2b);
// verify the accessibility and the BLOB contents
verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1a2, data);
verifyContents(server, jobId1, key1b, data2);
verifyContents(server, jobId2, key2a, data);
verifyContents(server, jobId2, key2b, data2);
@@ -486,6 +502,10 @@ public class BlobCachePutTest extends TestLogger {
TransientBlobKey key1a =
(TransientBlobKey) put(cache, jobId1, new ChunkedInputStream(data, 19), TRANSIENT_BLOB);
assertNotNull(key1a);
+ // second upload of same data should yield a different BlobKey
+ BlobKey key1a2 = put(cache, jobId1, new ChunkedInputStream(data, 19), TRANSIENT_BLOB);
+ assertNotNull(key1a2);
+ verifyKeyDifferentHashEquals(key1a, key1a2);
TransientBlobKey key1b =
(TransientBlobKey) put(cache, jobId1, new ChunkedInputStream(data2, 19), TRANSIENT_BLOB);
@@ -493,21 +513,23 @@ public class BlobCachePutTest extends TestLogger {
// files should be available on the server
verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1a2, data);
verifyContents(server, jobId1, key1b, data2);
// now put data for jobId2 and verify that both are ok
TransientBlobKey key2a =
(TransientBlobKey) put(cache, jobId2, new ChunkedInputStream(data, 19), TRANSIENT_BLOB);
assertNotNull(key2a);
- assertEquals(key1a, key2a);
+ verifyKeyDifferentHashEquals(key1a, key2a);
TransientBlobKey key2b =
(TransientBlobKey) put(cache, jobId2, new ChunkedInputStream(data2, 19), TRANSIENT_BLOB);
assertNotNull(key2b);
- assertEquals(key1b, key2b);
+ verifyKeyDifferentHashEquals(key1b, key2b);
// verify the accessibility and the BLOB contents
verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1a2, data);
verifyContents(server, jobId1, key1b, data2);
verifyContents(server, jobId2, key2a, data);
verifyContents(server, jobId2, key2b, data2);
@@ -858,7 +880,8 @@ public class BlobCachePutTest extends TestLogger {
// make sure that all blob keys are the same
while (blobKeyIterator.hasNext()) {
- assertEquals(blobKey, blobKeyIterator.next());
+ // check for unique BlobKey, but should have same hash
+ verifyKeyDifferentHashEquals(blobKey, blobKeyIterator.next());
}
// check the uploaded file's contents
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
index 1a3f161..e275949 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
@@ -38,13 +38,11 @@ import java.util.Random;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashDifferent;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
/**
@@ -131,10 +129,8 @@ public class BlobCacheRecoveryTest extends TestLogger {
// put non-HA data
nonHAKey = put(cache0, jobId[0], expected2, TRANSIENT_BLOB);
- assertNotEquals(keys[0], nonHAKey);
- assertThat(keys[0].getHash(), not(equalTo(nonHAKey.getHash())));
- assertNotEquals(keys[1], nonHAKey);
- assertThat(keys[1].getHash(), equalTo(nonHAKey.getHash()));
+ verifyKeyDifferentHashDifferent(keys[0], nonHAKey);
+ verifyKeyDifferentHashEquals(keys[1], nonHAKey);
// check that the storage directory exists
final Path blobServerPath = new Path(storagePath, "blob");
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 0e97604..9e4f4b7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -47,6 +47,7 @@ import java.util.Random;
import static org.apache.flink.runtime.blob.BlobCachePutTest.verifyDeletedEventually;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -113,15 +114,13 @@ public class BlobClientTest extends TestLogger {
*
* @param file
* the file to prepare for the unit tests
- * @param blobType
- * whether the BLOB should become permanent or transient
*
* @return the BLOB key of the prepared file
*
* @throws IOException
* thrown if an I/O error occurs while writing to the test file
*/
- private static BlobKey prepareTestFile(File file, BlobKey.BlobType blobType) throws IOException {
+ private static byte[] prepareTestFile(File file) throws IOException {
MessageDigest md = BlobUtils.createMessageDigest();
@@ -145,7 +144,7 @@ public class BlobClientTest extends TestLogger {
}
}
- return BlobKey.createKey(blobType, md.digest());
+ return md.digest();
}
/**
@@ -254,35 +253,38 @@ public class BlobClientTest extends TestLogger {
byte[] testBuffer = createTestBuffer();
MessageDigest md = BlobUtils.createMessageDigest();
md.update(testBuffer);
- BlobKey origKey = BlobKey.createKey(blobType, md.digest());
+ byte[] digest = md.digest();
InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
client = new BlobClient(serverAddress, getBlobClientConfig());
JobID jobId = new JobID();
- BlobKey receivedKey;
// Store the data (job-unrelated)
+ BlobKey receivedKey1 = null;
if (blobType == TRANSIENT_BLOB) {
- receivedKey = client.putBuffer(null, testBuffer, 0, testBuffer.length, blobType);
- assertEquals(origKey, receivedKey);
+ receivedKey1 = client.putBuffer(null, testBuffer, 0, testBuffer.length, blobType);
+ assertArrayEquals(digest, receivedKey1.getHash());
}
// try again with a job-related BLOB:
- receivedKey = client.putBuffer(jobId, testBuffer, 0, testBuffer.length, blobType);
- assertEquals(origKey, receivedKey);
+ BlobKey receivedKey2 = client.putBuffer(jobId, testBuffer, 0, testBuffer.length, blobType);
+ assertArrayEquals(digest, receivedKey2.getHash());
+ if (blobType == TRANSIENT_BLOB) {
+ verifyKeyDifferentHashEquals(receivedKey1, receivedKey2);
+ }
// Retrieve the data (job-unrelated)
if (blobType == TRANSIENT_BLOB) {
- validateGetAndClose(client.getInternal(null, receivedKey), testBuffer);
+ validateGetAndClose(client.getInternal(null, receivedKey1), testBuffer);
// transient BLOBs should be deleted from the server, eventually
- verifyDeletedEventually(getBlobServer(), null, receivedKey);
+ verifyDeletedEventually(getBlobServer(), null, receivedKey1);
}
// job-related
- validateGetAndClose(client.getInternal(jobId, receivedKey), testBuffer);
+ validateGetAndClose(client.getInternal(jobId, receivedKey2), testBuffer);
if (blobType == TRANSIENT_BLOB) {
// transient BLOBs should be deleted from the server, eventually
- verifyDeletedEventually(getBlobServer(), jobId, receivedKey);
+ verifyDeletedEventually(getBlobServer(), jobId, receivedKey2);
}
// Check reaction to invalid keys for job-unrelated blobs
@@ -342,41 +344,42 @@ public class BlobClientTest extends TestLogger {
throws IOException, InterruptedException {
File testFile = temporaryFolder.newFile();
- BlobKey origKey = prepareTestFile(testFile, blobType);
+ byte[] digest = prepareTestFile(testFile);
InputStream is = null;
try (BlobClient client = new BlobClient(new InetSocketAddress("localhost", getBlobServer().getPort()), getBlobClientConfig())) {
JobID jobId = new JobID();
- BlobKey receivedKey;
+ BlobKey receivedKey1 = null;
// Store the data (job-unrelated)
if (blobType == TRANSIENT_BLOB) {
is = new FileInputStream(testFile);
- receivedKey = client.putInputStream(null, is, blobType);
- assertEquals(origKey, receivedKey);
+ receivedKey1 = client.putInputStream(null, is, blobType);
+ assertArrayEquals(digest, receivedKey1.getHash());
}
// try again with a job-related BLOB:
is = new FileInputStream(testFile);
- receivedKey = client.putInputStream(jobId, is, blobType);
- assertEquals(origKey, receivedKey);
+ BlobKey receivedKey2 = client.putInputStream(jobId, is, blobType);
is.close();
is = null;
// Retrieve the data (job-unrelated)
if (blobType == TRANSIENT_BLOB) {
- validateGetAndClose(client.getInternal(null, receivedKey), testFile);
+ verifyKeyDifferentHashEquals(receivedKey1, receivedKey2);
+
+ validateGetAndClose(client.getInternal(null, receivedKey1), testFile);
// transient BLOBs should be deleted from the server, eventually
- verifyDeletedEventually(getBlobServer(), null, receivedKey);
+ verifyDeletedEventually(getBlobServer(), null, receivedKey1);
}
// job-related
- validateGetAndClose(client.getInternal(jobId, receivedKey), testFile);
+ validateGetAndClose(client.getInternal(jobId, receivedKey2), testFile);
if (blobType == TRANSIENT_BLOB) {
// transient BLOBs should be deleted from the server, eventually
- verifyDeletedEventually(getBlobServer(), jobId, receivedKey);
+ verifyDeletedEventually(getBlobServer(), jobId, receivedKey2);
}
} finally {
if (is != null) {
@@ -463,7 +466,7 @@ public class BlobClientTest extends TestLogger {
static void uploadJarFile(BlobServer blobServer, Configuration blobClientConfig) throws Exception {
final File testFile = File.createTempFile("testfile", ".dat");
testFile.deleteOnExit();
- prepareTestFile(testFile, PERMANENT_BLOB);
+ prepareTestFile(testFile);
InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort());
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
index 49f4fc2..ae538aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.blob;
import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.AbstractID;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -29,14 +30,17 @@ import java.io.IOException;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
/**
@@ -46,21 +50,35 @@ public final class BlobKeyTest extends TestLogger {
/**
* The first key array to be used during the unit tests.
*/
- private static final byte[] KEY_ARRAY_1 = new byte[20];
+ private static final byte[] KEY_ARRAY_1 = new byte[BlobKey.SIZE];
/**
* The second key array to be used during the unit tests.
*/
- private static final byte[] KEY_ARRAY_2 = new byte[20];
+ private static final byte[] KEY_ARRAY_2 = new byte[BlobKey.SIZE];
+
+ /**
+ * First byte array to use for the random component of a {@link BlobKey}.
+ */
+ private static final byte[] RANDOM_ARRAY_1 = new byte[AbstractID.SIZE];
+
+ /**
+ * Second byte array to use for the random component of a {@link BlobKey}.
+ */
+ private static final byte[] RANDOM_ARRAY_2 = new byte[AbstractID.SIZE];
/*
- * Initialize the key array.
+ * Initialize the key and random arrays.
*/
static {
for (int i = 0; i < KEY_ARRAY_1.length; ++i) {
KEY_ARRAY_1[i] = (byte) i;
KEY_ARRAY_2[i] = (byte) (i + 1);
}
+ for (int i = 0; i < RANDOM_ARRAY_1.length; ++i) {
+ RANDOM_ARRAY_1[i] = (byte) i;
+ RANDOM_ARRAY_2[i] = (byte) (i + 1);
+ }
}
@Test
@@ -89,7 +107,7 @@ public final class BlobKeyTest extends TestLogger {
* Tests the serialization/deserialization of BLOB keys.
*/
private void testSerialization(BlobKey.BlobType blobType) throws Exception {
- final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
+ final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_1);
final BlobKey k2 = CommonTestUtils.createCopySerializable(k1);
assertEquals(k1, k2);
assertEquals(k1.hashCode(), k2.hashCode());
@@ -107,16 +125,25 @@ public final class BlobKeyTest extends TestLogger {
}
/**
- * Tests the equals method.
+ * Tests the {@link BlobKey#equals(Object)} and {@link BlobKey#hashCode()} methods.
*/
private void testEquals(BlobKey.BlobType blobType) {
- final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
- final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1);
- final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2);
+ final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_1);
+ final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_1);
+ final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2, RANDOM_ARRAY_1);
+ final BlobKey k4 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_2);
assertTrue(k1.equals(k2));
assertTrue(k2.equals(k1));
+ assertEquals(k1.hashCode(), k2.hashCode());
assertFalse(k1.equals(k3));
assertFalse(k3.equals(k1));
+ assertFalse(k1.equals(k4));
+ assertFalse(k4.equals(k1));
+
+ //noinspection ObjectEqualsNull
+ assertFalse(k1.equals(null));
+ //noinspection EqualsBetweenInconvertibleTypes
+ assertFalse(k1.equals(this));
}
/**
@@ -124,8 +151,8 @@ public final class BlobKeyTest extends TestLogger {
*/
@Test
public void testEqualsDifferentBlobType() {
- final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1);
- final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1);
+ final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1, RANDOM_ARRAY_1);
+ final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1, RANDOM_ARRAY_1);
assertFalse(k1.equals(k2));
assertFalse(k2.equals(k1));
}
@@ -144,19 +171,22 @@ public final class BlobKeyTest extends TestLogger {
* Tests the compares method.
*/
private void testCompares(BlobKey.BlobType blobType) {
- final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
- final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1);
- final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2);
+ final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_1);
+ final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_1);
+ final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2, RANDOM_ARRAY_1);
+ final BlobKey k4 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_2);
assertThat(k1.compareTo(k2), is(0));
assertThat(k2.compareTo(k1), is(0));
assertThat(k1.compareTo(k3), lessThan(0));
+ assertThat(k1.compareTo(k4), lessThan(0));
assertThat(k3.compareTo(k1), greaterThan(0));
+ assertThat(k4.compareTo(k1), greaterThan(0));
}
@Test
public void testComparesDifferentBlobType() {
- final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1);
- final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1);
+ final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1, RANDOM_ARRAY_1);
+ final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1, RANDOM_ARRAY_1);
assertThat(k1.compareTo(k2), greaterThan(0));
assertThat(k2.compareTo(k1), lessThan(0));
}
@@ -175,7 +205,7 @@ public final class BlobKeyTest extends TestLogger {
* Test the serialization/deserialization using input/output streams.
*/
private void testStreams(BlobKey.BlobType blobType) throws IOException {
- final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
+ final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1, RANDOM_ARRAY_1);
final ByteArrayOutputStream baos = new ByteArrayOutputStream(20);
k1.writeToOutputStream(baos);
@@ -188,6 +218,28 @@ public final class BlobKeyTest extends TestLogger {
}
/**
+ * Verifies that the two given key's are different in total but share the same hash.
+ *
+ * @param key1 first blob key
+ * @param key2 second blob key
+ */
+ static void verifyKeyDifferentHashEquals(BlobKey key1, BlobKey key2) {
+ assertNotEquals(key1, key2);
+ assertThat(key1.getHash(), equalTo(key2.getHash()));
+ }
+
+ /**
+ * Verifies that the two given key's are different in total and also have different hashes.
+ *
+ * @param key1 first blob key
+ * @param key2 second blob key
+ */
+ static void verifyKeyDifferentHashDifferent(BlobKey key1, BlobKey key2) {
+ assertNotEquals(key1, key2);
+ assertThat(key1.getHash(), not(equalTo(key2.getHash())));
+ }
+
+ /**
* Verifies that the given <tt>key</tt> is of an expected type.
*
* @param expected the type the key should have
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index a110d4a..fde21ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -47,10 +47,11 @@ import java.util.concurrent.Executors;
import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -86,6 +87,12 @@ public class BlobServerDeleteTest extends TestLogger {
testDeleteTransient(new JobID(), new JobID());
}
+ @Test
+ public void testDeleteTransient5() throws IOException {
+ JobID jobId = new JobID();
+ testDeleteTransient(jobId, jobId);
+ }
+
/**
* Uploads a (different) byte array for each of the given jobs and verifies that deleting one of
* them (via the {@link BlobServer}) does not influence the other.
@@ -97,7 +104,6 @@ public class BlobServerDeleteTest extends TestLogger {
*/
private void testDeleteTransient(@Nullable JobID jobId1, @Nullable JobID jobId2)
throws IOException {
- final boolean sameJobId = (jobId1 == jobId2) || (jobId1 != null && jobId1.equals(jobId2));
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -118,7 +124,7 @@ public class BlobServerDeleteTest extends TestLogger {
// put two more BLOBs (same key, other key) for another job ID
TransientBlobKey key2a = (TransientBlobKey) put(server, jobId2, data, TRANSIENT_BLOB);
assertNotNull(key2a);
- assertEquals(key1, key2a);
+ verifyKeyDifferentHashEquals(key1, key2a);
TransientBlobKey key2b = (TransientBlobKey) put(server, jobId2, data2, TRANSIENT_BLOB);
assertNotNull(key2b);
@@ -126,10 +132,9 @@ public class BlobServerDeleteTest extends TestLogger {
assertTrue(delete(server, jobId1, key1));
verifyDeleted(server, jobId1, key1);
- // deleting a one BLOB should not affect another BLOB, even with the same key if job IDs are different
- if (!sameJobId) {
- verifyContents(server, jobId2, key2a, data);
- }
+ // deleting a one BLOB should not affect another BLOB with a different key
+ // (and keys are always different now)
+ verifyContents(server, jobId2, key2a, data);
verifyContents(server, jobId2, key2b, data2);
// delete first file of second job
@@ -284,7 +289,7 @@ public class BlobServerDeleteTest extends TestLogger {
BlobKey key1a = put(server, jobId1, data, blobType);
BlobKey key2 = put(server, jobId2, data, blobType);
- assertEquals(key1a, key2);
+ assertArrayEquals(key1a.getHash(), key2.getHash());
BlobKey key1b = put(server, jobId1, data2, blobType);
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 4927279..e3b5309 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -42,7 +42,6 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.AccessDeniedException;
import java.nio.file.NoSuchFileException;
-import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -58,6 +57,7 @@ import java.util.concurrent.Executors;
import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
import static org.apache.flink.runtime.blob.BlobUtils.JOB_DIR_PREFIX;
import static org.junit.Assert.assertArrayEquals;
@@ -141,18 +141,18 @@ public class BlobServerGetTest extends TestLogger {
// add the same data under a second jobId
BlobKey key2 = put(server, jobId2, data, blobType);
- assertNotNull(key);
- assertEquals(key, key2);
+ assertNotNull(key2);
+ verifyKeyDifferentHashEquals(key, key2);
// request for jobId2 should succeed
- get(server, jobId2, key);
+ get(server, jobId2, key2);
// request for jobId1 should still fail
verifyDeleted(server, jobId1, key);
// same checks as for jobId1 but for jobId2 should also work:
- blobFile = server.getStorageLocation(jobId2, key);
+ blobFile = server.getStorageLocation(jobId2, key2);
assertTrue(blobFile.delete());
- verifyDeleted(server, jobId2, key);
+ verifyDeleted(server, jobId2, key2);
}
}
@@ -373,11 +373,6 @@ public class BlobServerGetTest extends TestLogger {
final byte[] data = {1, 2, 3, 4, 99, 42};
- MessageDigest md = BlobUtils.createMessageDigest();
-
- // create the correct blob key by hashing our input data
- final BlobKey blobKey = BlobKey.createKey(blobType, md.digest(data));
-
doAnswer(
new Answer() {
@Override
@@ -398,7 +393,7 @@ public class BlobServerGetTest extends TestLogger {
server.start();
// upload data first
- assertEquals(blobKey, put(server, jobId, data, blobType));
+ final BlobKey blobKey = put(server, jobId, data, blobType);
// now try accessing it concurrently (only HA mode will be able to retrieve it from HA store!)
if (blobType == PERMANENT_BLOB) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index aefd0a3..dcf49a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -62,6 +62,7 @@ import java.util.concurrent.Executors;
import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -226,21 +227,26 @@ public class BlobServerPutTest extends TestLogger {
// put data for jobId1 and verify
BlobKey key1a = put(server, jobId1, data, blobType);
assertNotNull(key1a);
+ // second upload of same data should yield a different BlobKey
+ BlobKey key1a2 = put(server, jobId1, data, blobType);
+ assertNotNull(key1a2);
+ verifyKeyDifferentHashEquals(key1a, key1a2);
BlobKey key1b = put(server, jobId1, data2, blobType);
assertNotNull(key1b);
verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1a2, data);
verifyContents(server, jobId1, key1b, data2);
// now put data for jobId2 and verify that both are ok
BlobKey key2a = put(server, jobId2, data, blobType);
assertNotNull(key2a);
- assertEquals(key1a, key2a);
+ verifyKeyDifferentHashEquals(key1a, key2a);
BlobKey key2b = put(server, jobId2, data2, blobType);
assertNotNull(key2b);
- assertEquals(key1b, key2b);
+ verifyKeyDifferentHashEquals(key1b, key2b);
// verify the accessibility and the BLOB contents
verifyContents(server, jobId2, key2a, data);
@@ -249,6 +255,7 @@ public class BlobServerPutTest extends TestLogger {
// verify the accessibility and the BLOB contents one more time (transient BLOBs should
// not be deleted here)
verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1a2, data);
verifyContents(server, jobId1, key1b, data2);
verifyContents(server, jobId2, key2a, data);
verifyContents(server, jobId2, key2b, data2);
@@ -311,21 +318,26 @@ public class BlobServerPutTest extends TestLogger {
// put data for jobId1 and verify
BlobKey key1a = put(server, jobId1, new ByteArrayInputStream(data), blobType);
assertNotNull(key1a);
+ // second upload of same data should yield a different BlobKey
+ BlobKey key1a2 = put(server, jobId1, new ByteArrayInputStream(data), blobType);
+ assertNotNull(key1a2);
+ verifyKeyDifferentHashEquals(key1a, key1a2);
BlobKey key1b = put(server, jobId1, new ByteArrayInputStream(data2), blobType);
assertNotNull(key1b);
verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1a2, data);
verifyContents(server, jobId1, key1b, data2);
// now put data for jobId2 and verify that both are ok
BlobKey key2a = put(server, jobId2, new ByteArrayInputStream(data), blobType);
assertNotNull(key2a);
- assertEquals(key1a, key2a);
+ verifyKeyDifferentHashEquals(key1a, key2a);
BlobKey key2b = put(server, jobId2, new ByteArrayInputStream(data2), blobType);
assertNotNull(key2b);
- assertEquals(key1b, key2b);
+ verifyKeyDifferentHashEquals(key1b, key2b);
// verify the accessibility and the BLOB contents
verifyContents(server, jobId2, key2a, data);
@@ -334,6 +346,7 @@ public class BlobServerPutTest extends TestLogger {
// verify the accessibility and the BLOB contents one more time (transient BLOBs should
// not be deleted here)
verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1a2, data);
verifyContents(server, jobId1, key1b, data2);
verifyContents(server, jobId2, key2a, data);
verifyContents(server, jobId2, key2b, data2);
@@ -396,21 +409,26 @@ public class BlobServerPutTest extends TestLogger {
// put data for jobId1 and verify
BlobKey key1a = put(server, jobId1, new ChunkedInputStream(data, 19), blobType);
assertNotNull(key1a);
+ // second upload of same data should yield a different BlobKey
+ BlobKey key1a2 = put(server, jobId1, new ChunkedInputStream(data, 19), blobType);
+ assertNotNull(key1a2);
+ verifyKeyDifferentHashEquals(key1a, key1a2);
BlobKey key1b = put(server, jobId1, new ChunkedInputStream(data2, 19), blobType);
assertNotNull(key1b);
verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1a2, data);
verifyContents(server, jobId1, key1b, data2);
// now put data for jobId2 and verify that both are ok
BlobKey key2a = put(server, jobId2, new ChunkedInputStream(data, 19), blobType);
assertNotNull(key2a);
- assertEquals(key1a, key2a);
+ verifyKeyDifferentHashEquals(key1a, key2a);
BlobKey key2b = put(server, jobId2, new ChunkedInputStream(data2, 19), blobType);
assertNotNull(key2b);
- assertEquals(key1b, key2b);
+ verifyKeyDifferentHashEquals(key1b, key2b);
// verify the accessibility and the BLOB contents
verifyContents(server, jobId2, key2a, data);
@@ -419,6 +437,7 @@ public class BlobServerPutTest extends TestLogger {
// verify the accessibility and the BLOB contents one more time (transient BLOBs should
// not be deleted here)
verifyContents(server, jobId1, key1a, data);
+ verifyContents(server, jobId1, key1a2, data);
verifyContents(server, jobId1, key1b, data2);
verifyContents(server, jobId2, key2a, data);
verifyContents(server, jobId2, key2b, data2);
@@ -700,7 +719,7 @@ public class BlobServerPutTest extends TestLogger {
// make sure that all blob keys are the same
while (blobKeyIterator.hasNext()) {
- assertEquals(blobKey, blobKeyIterator.next());
+ verifyKeyDifferentHashEquals(blobKey, blobKeyIterator.next());
}
// check the uploaded file's contents
http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
index 35575b5..5b8d0e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
@@ -39,12 +39,10 @@ import java.util.Random;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -126,8 +124,7 @@ public class BlobServerRecoveryTest extends TestLogger {
// put non-HA data
nonHAKey = put(server0, jobId[0], expected2, TRANSIENT_BLOB);
- assertNotEquals(keys[1], nonHAKey);
- assertThat(keys[1].getHash(), equalTo(nonHAKey.getHash()));
+ verifyKeyDifferentHashEquals(keys[1], nonHAKey);
// check that the storage directory exists
final Path blobServerPath = new Path(storagePath, "blob");