You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/17 06:18:03 UTC

[5/5] flink git commit: [FLINK-6020] Introduce BlobServer#readWriteLock to synchronize file creation and deletion

[FLINK-6020] Introduce BlobServer#readWriteLock to synchronize file creation and deletion

This commit introduces a BlobServer#readWriteLock in order to synchronize file creation
and deletion operations in BlobServerConnection and BlobServer. This will prevent
that multiple put and get operations interfere with each other and with get operations.

The get operations are synchronized using the read lock in order to guarantee some kind of
parallelism.

Add Get and Delete operation tests

This closes #3888.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60873b0c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60873b0c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60873b0c

Branch: refs/heads/release-1.3
Commit: 60873b0c57be7b83d55af179b4be4defc46d80de
Parents: e3ea89a
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 10 17:38:49 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed May 17 08:17:02 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/blob/BlobClient.java   |   2 +-
 .../apache/flink/runtime/blob/BlobServer.java   |  29 ++-
 .../runtime/blob/BlobServerConnection.java      | 240 +++++++++++++++----
 .../runtime/blob/BlobServerDeleteTest.java      |  73 +++++-
 .../flink/runtime/blob/BlobServerGetTest.java   | 115 ++++++++-
 .../flink/runtime/blob/BlobServerPutTest.java   | 109 ++++++++-
 .../src/test/resources/log4j-test.properties    |   2 +-
 7 files changed, 509 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/60873b0c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index ea90f54..3da7ef2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -538,7 +538,7 @@ public final class BlobClient implements Closeable {
 			throw new IOException("Server side error: " + cause.getMessage(), cause);
 		}
 		else {
-			throw new IOException("Unrecognized response");
+			throw new IOException("Unrecognized response: " + response + '.');
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/60873b0c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index a006981..ba04c41 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -44,6 +44,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -85,6 +87,9 @@ public class BlobServer extends Thread implements BlobService {
 	/** The maximum number of concurrent connections */
 	private final int maxConnections;
 
+	/** Lock guarding concurrent file accesses */
+	private final ReadWriteLock readWriteLock;
+
 	/**
 	 * Shutdown hook thread to ensure deletion of the storage directory (or <code>null</code> if
 	 * the configured high availability mode does not equal{@link HighAvailabilityMode#NONE})
@@ -104,6 +109,7 @@ public class BlobServer extends Thread implements BlobService {
 	public BlobServer(Configuration config, BlobStore blobStore) throws IOException {
 		this.blobServiceConfiguration = checkNotNull(config);
 		this.blobStore = checkNotNull(blobStore);
+		this.readWriteLock = new ReentrantReadWriteLock();
 
 		// configure and create the storage directory
 		String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
@@ -237,6 +243,13 @@ public class BlobServer extends Thread implements BlobService {
 		return blobStore;
 	}
 
+	/**
+	 * Returns the lock used to guard file accesses
+	 */
+	public ReadWriteLock getReadWriteLock() {
+		return readWriteLock;
+	}
+
 	@Override
 	public void run() {
 		try {
@@ -397,13 +410,19 @@ public class BlobServer extends Thread implements BlobService {
 	public void delete(BlobKey key) throws IOException {
 		final File localFile = BlobUtils.getStorageLocation(storageDir, key);
 
-		if (localFile.exists()) {
-			if (!localFile.delete()) {
-				LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath());
+		readWriteLock.writeLock().lock();
+
+		try {
+			if (localFile.exists()) {
+				if (!localFile.delete()) {
+					LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath());
+				}
 			}
-		}
 
-		blobStore.delete(key);
+			blobStore.delete(key);
+		} finally {
+			readWriteLock.writeLock().unlock();
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/60873b0c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index 13a90c6..a76dbd5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.blob;
 
-import com.google.common.io.Files;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
@@ -33,7 +32,11 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.net.SocketException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
 import java.security.MessageDigest;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
 
 import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
@@ -67,6 +70,12 @@ class BlobServerConnection extends Thread {
 	/** The HA blob store. */
 	private final BlobStore blobStore;
 
+	/** Write lock to synchronize file accesses */
+	private final Lock writeLock;
+
+	/** Read lock to synchronize file accesses */
+	private final Lock readLock;
+
 	/**
 	 * Creates a new BLOB connection for a client request
 	 * 
@@ -74,7 +83,7 @@ class BlobServerConnection extends Thread {
 	 * @param blobServer The BLOB server.
 	 */
 	BlobServerConnection(Socket clientSocket, BlobServer blobServer) {
-		super("BLOB connection for " + clientSocket.getRemoteSocketAddress().toString());
+		super("BLOB connection for " + clientSocket.getRemoteSocketAddress());
 		setDaemon(true);
 
 		if (blobServer == null) {
@@ -84,6 +93,11 @@ class BlobServerConnection extends Thread {
 		this.clientSocket = clientSocket;
 		this.blobServer = blobServer;
 		this.blobStore = blobServer.getBlobStore();
+
+		ReadWriteLock readWriteLock = blobServer.getReadWriteLock();
+
+		this.writeLock = readWriteLock.writeLock();
+		this.readLock = readWriteLock.readLock();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -178,8 +192,13 @@ class BlobServerConnection extends Thread {
 		 */
 
 		File blobFile;
+		int contentAddressable = -1;
+		JobID jobId = null;
+		String key = null;
+		BlobKey blobKey = null;
+
 		try {
-			final int contentAddressable = inputStream.read();
+			contentAddressable = inputStream.read();
 
 			if (contentAddressable < 0) {
 				throw new EOFException("Premature end of GET request");
@@ -189,37 +208,18 @@ class BlobServerConnection extends Thread {
 				byte[] jidBytes = new byte[JobID.SIZE];
 				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
 
-				JobID jobID = JobID.fromByteArray(jidBytes);
-				String key = readKey(buf, inputStream);
-				blobFile = this.blobServer.getStorageLocation(jobID, key);
-
-				if (!blobFile.exists()) {
-					blobStore.get(jobID, key, blobFile);
-				}
+				jobId = JobID.fromByteArray(jidBytes);
+				key = readKey(buf, inputStream);
+				blobFile = blobServer.getStorageLocation(jobId, key);
 			}
 			else if (contentAddressable == CONTENT_ADDRESSABLE) {
-				final BlobKey key = BlobKey.readFromInputStream(inputStream);
-				blobFile = blobServer.getStorageLocation(key);
-
-				if (!blobFile.exists()) {
-					blobStore.get(key, blobFile);
-				}
+				blobKey = BlobKey.readFromInputStream(inputStream);
+				blobFile = blobServer.getStorageLocation(blobKey);
 			}
 			else {
-				throw new IOException("Unknown type of BLOB addressing.");
-			}
-
-			// Check if BLOB exists
-			if (!blobFile.exists()) {
-				throw new IOException("Cannot find required BLOB at " + blobFile.getAbsolutePath());
-			}
-
-			if (blobFile.length() > Integer.MAX_VALUE) {
-				throw new IOException("BLOB size exceeds the maximum size (2 GB).");
+				throw new IOException("Unknown type of BLOB addressing: " + contentAddressable + '.');
 			}
 
-			outputStream.write(RETURN_OKAY);
-
 			// up to here, an error can give a good message
 		}
 		catch (Throwable t) {
@@ -235,8 +235,58 @@ class BlobServerConnection extends Thread {
 			return;
 		}
 
-		// from here on, we started sending data, so all we can do is close the connection when something happens
+		readLock.lock();
+
 		try {
+			try {
+				if (!blobFile.exists()) {
+					// first we have to release the read lock in order to acquire the write lock
+					readLock.unlock();
+					writeLock.lock();
+
+					try {
+						if (blobFile.exists()) {
+							LOG.debug("Blob file {} has downloaded from the BlobStore by a different connection.", blobFile);
+						} else {
+							if (contentAddressable == NAME_ADDRESSABLE) {
+								blobStore.get(jobId, key, blobFile);
+							} else if (contentAddressable == CONTENT_ADDRESSABLE) {
+								blobStore.get(blobKey, blobFile);
+							} else {
+								throw new IOException("Unknown type of BLOB addressing: " + contentAddressable + '.');
+							}
+						}
+					} finally {
+						writeLock.unlock();
+					}
+
+					readLock.lock();
+
+					// Check if BLOB exists
+					if (!blobFile.exists()) {
+						throw new IOException("Cannot find required BLOB at " + blobFile.getAbsolutePath());
+					}
+				}
+
+				if (blobFile.length() > Integer.MAX_VALUE) {
+					throw new IOException("BLOB size exceeds the maximum size (2 GB).");
+				}
+
+				outputStream.write(RETURN_OKAY);
+			} catch (Throwable t) {
+				LOG.error("GET operation failed", t);
+				try {
+					writeErrorToStream(outputStream, t);
+				}
+				catch (IOException e) {
+					// since we are in an exception case, it means not much that we could not send the error
+					// ignore this
+				}
+				clientSocket.close();
+				return;
+			}
+
+			// from here on, we started sending data, so all we can do is close the connection when something happens
 			int blobLen = (int) blobFile.length();
 			writeLength(blobLen, outputStream);
 
@@ -251,14 +301,14 @@ class BlobServerConnection extends Thread {
 					bytesRemaining -= read;
 				}
 			}
-		}
-		catch (SocketException e) {
+		} catch (SocketException e) {
 			// happens when the other side disconnects
 			LOG.debug("Socket connection closed", e);
-		}
-		catch (Throwable t) {
+		} catch (Throwable t) {
 			LOG.error("GET operation failed", t);
 			clientSocket.close();
+		} finally {
+			readLock.unlock();
 		}
 	}
 
@@ -328,21 +378,83 @@ class BlobServerConnection extends Thread {
 			fos.close();
 
 			if (contentAddressable == NAME_ADDRESSABLE) {
-				File storageFile = this.blobServer.getStorageLocation(jobID, key);
-				Files.move(incomingFile, storageFile);
-				incomingFile = null;
+				File storageFile = blobServer.getStorageLocation(jobID, key);
 
-				blobStore.put(storageFile, jobID, key);
+				writeLock.lock();
+
+				try {
+					// first check whether the file already exists
+					if (!storageFile.exists()) {
+						try {
+							// only move the file if it does not yet exist
+							Files.move(incomingFile.toPath(), storageFile.toPath());
+
+							incomingFile = null;
+
+						} catch (FileAlreadyExistsException ignored) {
+							LOG.warn("Detected concurrent file modifications. This should only happen if multiple" +
+								"BlobServer use the same storage directory.");
+							// we cannot be sure at this point whether the file has already been uploaded to the blob
+							// store or not. Even if the blobStore might shortly be in an inconsistent state, we have
+							// persist the blob. Otherwise we might not be able to recover the job.
+						}
+
+						// only the one moving the incoming file to its final destination is allowed to upload the
+						// file to the blob store
+						blobStore.put(storageFile, jobID, key);
+					}
+				} catch(IOException ioe) {
+					// we failed to either create the local storage file or to upload it --> try to delete the local file
+					// while still having the write lock
+					if (storageFile.exists() && !storageFile.delete()) {
+						LOG.warn("Could not delete the storage file.");
+					}
+
+					throw ioe;
+				} finally {
+					writeLock.unlock();
+				}
 
 				outputStream.write(RETURN_OKAY);
 			}
 			else {
 				BlobKey blobKey = new BlobKey(md.digest());
 				File storageFile = blobServer.getStorageLocation(blobKey);
-				Files.move(incomingFile, storageFile);
-				incomingFile = null;
 
-				blobStore.put(storageFile, blobKey);
+				writeLock.lock();
+
+				try {
+					// first check whether the file already exists
+					if (!storageFile.exists()) {
+						try {
+							// only move the file if it does not yet exist
+							Files.move(incomingFile.toPath(), storageFile.toPath());
+
+							incomingFile = null;
+
+						} catch (FileAlreadyExistsException ignored) {
+							LOG.warn("Detected concurrent file modifications. This should only happen if multiple" +
+								"BlobServer use the same storage directory.");
+							// we cannot be sure at this point whether the file has already been uploaded to the blob
+							// store or not. Even if the blobStore might shortly be in an inconsistent state, we have
+							// persist the blob. Otherwise we might not be able to recover the job.
+						}
+
+						// only the one moving the incoming file to its final destination is allowed to upload the
+						// file to the blob store
+						blobStore.put(storageFile, blobKey);
+					}
+				} catch(IOException ioe) {
+					// we failed to either create the local storage file or to upload it --> try to delete the local file
+					// while still having the write lock
+					if (storageFile.exists() && !storageFile.delete()) {
+						LOG.warn("Could not delete the storage file.");
+					}
+
+					throw ioe;
+				} finally {
+					writeLock.unlock();
+				}
 
 				// Return computed key to client for validation
 				outputStream.write(RETURN_OKAY);
@@ -397,12 +509,21 @@ class BlobServerConnection extends Thread {
 
 			if (type == CONTENT_ADDRESSABLE) {
 				BlobKey key = BlobKey.readFromInputStream(inputStream);
-				File blobFile = this.blobServer.getStorageLocation(key);
-				if (blobFile.exists() && !blobFile.delete()) {
-					throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
-				}
+				File blobFile = blobServer.getStorageLocation(key);
+
+				writeLock.lock();
 
-				blobStore.delete(key);
+				try {
+					// we should make the local and remote file deletion atomic, otherwise we might risk not
+					// removing the remote file in case of a concurrent put operation
+					if (blobFile.exists() && !blobFile.delete()) {
+						throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
+					}
+
+					blobStore.delete(key);
+				} finally {
+					writeLock.unlock();
+				}
 			}
 			else if (type == NAME_ADDRESSABLE) {
 				byte[] jidBytes = new byte[JobID.SIZE];
@@ -412,20 +533,37 @@ class BlobServerConnection extends Thread {
 				String key = readKey(buf, inputStream);
 
 				File blobFile = this.blobServer.getStorageLocation(jobID, key);
-				if (blobFile.exists() && !blobFile.delete()) {
-					throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
-				}
 
-				blobStore.delete(jobID, key);
+				writeLock.lock();
+
+				try {
+					// we should make the local and remote file deletion atomic, otherwise we might risk not
+					// removing the remote file in case of a concurrent put operation
+					if (blobFile.exists() && !blobFile.delete()) {
+						throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
+					}
+
+					blobStore.delete(jobID, key);
+				} finally {
+					writeLock.unlock();
+				}
 			}
 			else if (type == JOB_ID_SCOPE) {
 				byte[] jidBytes = new byte[JobID.SIZE];
 				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
 				JobID jobID = JobID.fromByteArray(jidBytes);
 
-				blobServer.deleteJobDirectory(jobID);
+				writeLock.lock();
+
+				try {
+					// we should make the local and remote file deletion atomic, otherwise we might risk not
+					// removing the remote file in case of a concurrent put operation
+					blobServer.deleteJobDirectory(jobID);
 
-				blobStore.deleteAll(jobID);
+					blobStore.deleteAll(jobID);
+				} finally {
+					writeLock.unlock();
+				}
 			}
 			else {
 				throw new IOException("Unrecognized addressing type: " + type);

http://git-wip-us.apache.org/repos/asf/flink/blob/60873b0c/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index e8e28a1..5e1d86e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -20,23 +20,35 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.mock;
 
 /**
  * Tests how DELETE requests behave.
  */
-public class BlobServerDeleteTest {
+public class BlobServerDeleteTest extends TestLogger {
 
 	private final Random rnd = new Random();
 
@@ -285,6 +297,65 @@ public class BlobServerDeleteTest {
 		}
 	}
 
+	/**
+	 * FLINK-6020
+	 *
+	 * Tests that concurrent delete operations don't interfere with each other.
+	 *
+	 * Note: The test checks that there cannot be two threads which have checked whether a given blob file exist
+	 * and then one of them fails deleting it. Without the introduced lock, this situation should rarely happen
+	 * and make this test fail. Thus, if this test should become "unstable", then the delete atomicity is most likely
+	 * broken.
+	 */
+	@Test
+	public void testConcurrentDeleteOperations() throws IOException, ExecutionException, InterruptedException {
+		final Configuration configuration = new Configuration();
+		final BlobStore blobStore = mock(BlobStore.class);
+
+		final int concurrentDeleteOperations = 3;
+		final ExecutorService executor = Executors.newFixedThreadPool(concurrentDeleteOperations);
+
+		final List<Future<Void>> deleteFutures = new ArrayList<>(concurrentDeleteOperations);
+
+		final byte[] data = {1, 2, 3};
+
+		try (final BlobServer blobServer = new BlobServer(configuration, blobStore)) {
+
+			final BlobKey blobKey;
+
+			try (BlobClient client = blobServer.createClient()) {
+				blobKey = client.put(data);
+			}
+
+			assertTrue(blobServer.getStorageLocation(blobKey).exists());
+
+			for (int i = 0; i < concurrentDeleteOperations; i++) {
+				Future<Void> deleteFuture = FlinkCompletableFuture.supplyAsync(new Callable<Void>() {
+					@Override
+					public Void call() throws Exception {
+						try (BlobClient blobClient = blobServer.createClient()) {
+							blobClient.delete(blobKey);
+						}
+
+						return null;
+					}
+				}, executor);
+
+				deleteFutures.add(deleteFuture);
+			}
+
+			Future<Void> waitFuture = FutureUtils.waitForAll(deleteFutures);
+
+			// make sure all delete operation have completed successfully
+			// in case of no lock, one of the delete operations should eventually fail
+			waitFuture.get();
+
+			assertFalse(blobServer.getStorageLocation(blobKey).exists());
+		} finally {
+			executor.shutdownNow();
+		}
+	}
+
 	private void cleanup(BlobServer server, BlobClient client) {
 		if (client != null) {
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/60873b0c/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 6d1dba8..a3de02f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -18,27 +18,57 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.util.TestLogger;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 
 import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /**
  * Tests how failing GET requests behave in the presence of failures.
  * Successful GET requests are tested in conjunction wit the PUT
  * requests.
  */
-public class BlobServerGetTest {
+public class BlobServerGetTest extends TestLogger {
 
 	private final Random rnd = new Random();
 
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
 	@Test
 	public void testGetFailsDuringLookup() throws IOException {
 		BlobServer server = null;
@@ -128,4 +158,87 @@ public class BlobServerGetTest {
 			}
 		}
 	}
+
+	/**
+	 * FLINK-6020
+	 *
+	 * Tests that concurrent get operations don't concurrently access the BlobStore to download a blob.
+	 */
+	@Test
+	public void testConcurrentGetOperations() throws IOException, ExecutionException, InterruptedException {
+		final Configuration configuration = new Configuration();
+
+		configuration.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, temporaryFolder.newFolder().getAbsolutePath());
+
+		final BlobStore blobStore = mock(BlobStore.class);
+
+		final int numberConcurrentGetOperations = 3;
+		final List<Future<InputStream>> getOperations = new ArrayList<>(numberConcurrentGetOperations);
+
+		final byte[] data = {1, 2, 3, 4, 99, 42};
+		final ByteArrayInputStream bais = new ByteArrayInputStream(data);
+
+		MessageDigest md = BlobUtils.createMessageDigest();
+
+		// create the correct blob key by hashing our input data
+		final BlobKey blobKey = new BlobKey(md.digest(data));
+
+		doAnswer(
+			new Answer() {
+				@Override
+				public Object answer(InvocationOnMock invocation) throws Throwable {
+					File targetFile = (File) invocation.getArguments()[1];
+
+					FileUtils.copyInputStreamToFile(bais, targetFile);
+
+					return null;
+				}
+			}
+		).when(blobStore).get(any(BlobKey.class), any(File.class));
+
+		final ExecutorService executor = Executors.newFixedThreadPool(numberConcurrentGetOperations);
+
+		try (final BlobServer blobServer = new BlobServer(configuration, blobStore)) {
+			for (int i = 0; i < numberConcurrentGetOperations; i++) {
+				Future<InputStream> getOperation = FlinkCompletableFuture.supplyAsync(new Callable<InputStream>() {
+					@Override
+					public InputStream call() throws Exception {
+						try (BlobClient blobClient = blobServer.createClient();
+							 InputStream inputStream = blobClient.get(blobKey)) {
+							byte[] buffer = new byte[data.length];
+
+							IOUtils.readFully(inputStream, buffer);
+
+							return new ByteArrayInputStream(buffer);
+						}
+					}
+				}, executor);
+
+				getOperations.add(getOperation);
+			}
+
+			Future<Collection<InputStream>> inputStreamsFuture = FutureUtils.combineAll(getOperations);
+
+			Collection<InputStream> inputStreams = inputStreamsFuture.get();
+
+			// check that we have read the right data
+			for (InputStream inputStream : inputStreams) {
+				ByteArrayOutputStream baos = new ByteArrayOutputStream(data.length);
+
+				IOUtils.copy(inputStream, baos);
+
+				baos.close();
+				byte[] input = baos.toByteArray();
+
+				assertArrayEquals(data, input);
+
+				inputStream.close();
+			}
+
+			// verify that we downloaded the requested blob exactly once from the BlobStore
+			verify(blobStore, times(1)).get(eq(blobKey), any(File.class));
+		} finally {
+			executor.shutdownNow();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/60873b0c/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index 441ca7d..35ef968 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -20,7 +20,12 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -28,16 +33,29 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static org.junit.Assert.*;
 import static org.junit.Assume.assumeTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /**
  * Tests for successful and failing PUT operations against the BLOB server,
  * and successful GET operations.
  */
-public class BlobServerPutTest {
+public class BlobServerPutTest extends TestLogger {
 
 	private final Random rnd = new Random();
 
@@ -299,6 +317,95 @@ public class BlobServerPutTest {
 		}
 	}
 
+	/**
+	 * FLINK-6020
+	 *
+	 * Tests that concurrent put operations will only upload the file once to the {@link BlobStore}.
+	 */
+	@Test
+	public void testConcurrentPutOperations() throws IOException, ExecutionException, InterruptedException {
+		final Configuration configuration = new Configuration();
+		BlobStore blobStore = mock(BlobStore.class);
+		int concurrentPutOperations = 2;
+		int dataSize = 1024;
+
+		final CountDownLatch countDownLatch = new CountDownLatch(concurrentPutOperations);
+		final byte[] data = new byte[dataSize];
+
+		ArrayList<Future<BlobKey>> allFutures = new ArrayList(concurrentPutOperations);
+
+		ExecutorService executor = Executors.newFixedThreadPool(concurrentPutOperations);
+
+		try (
+			final BlobServer blobServer = new BlobServer(configuration, blobStore)) {
+
+			for (int i = 0; i < concurrentPutOperations; i++) {
+				Future<BlobKey> putFuture = FlinkCompletableFuture.supplyAsync(new Callable<BlobKey>() {
+					@Override
+					public BlobKey call() throws Exception {
+						try (BlobClient blobClient = blobServer.createClient()) {
+							return blobClient.put(new BlockingInputStream(countDownLatch, data));
+						}
+					}
+				}, executor);
+
+				allFutures.add(putFuture);
+			}
+
+			FutureUtils.ConjunctFuture<Collection<BlobKey>> conjunctFuture = FutureUtils.combineAll(allFutures);
+
+			// wait until all operations have completed and check that no exception was thrown
+			Collection<BlobKey> blobKeys = conjunctFuture.get();
+
+			Iterator<BlobKey> blobKeyIterator = blobKeys.iterator();
+
+			assertTrue(blobKeyIterator.hasNext());
+
+			BlobKey blobKey = blobKeyIterator.next();
+
+			// make sure that all blob keys are the same
+			while(blobKeyIterator.hasNext()) {
+				assertEquals(blobKey, blobKeyIterator.next());
+			}
+
+			// check that we only uploaded the file once to the blob store
+			verify(blobStore, times(1)).put(any(File.class), eq(blobKey));
+		} finally {
+			executor.shutdownNow();
+		}
+	}
+
+	private static final class BlockingInputStream extends InputStream {
+
+		private final CountDownLatch countDownLatch;
+		private final byte[] data;
+		private int index = 0;
+
+		public BlockingInputStream(CountDownLatch countDownLatch, byte[] data) {
+			this.countDownLatch = Preconditions.checkNotNull(countDownLatch);
+			this.data = Preconditions.checkNotNull(data);
+		}
+
+		@Override
+		public int read() throws IOException {
+
+			countDownLatch.countDown();
+
+			try {
+				countDownLatch.await();
+			} catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+				throw new IOException("Blocking operation was interrupted.", e);
+			}
+
+			if (index >= data.length) {
+				return -1;
+			} else {
+				return data[index++];
+			}
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	private static final class ChunkedInputStream extends InputStream {

http://git-wip-us.apache.org/repos/asf/flink/blob/60873b0c/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 98f136a..7ba1633 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=INFO, console
+log4j.rootLogger=OFF, console
 
 # -----------------------------------------------------------------------------
 # Console (use 'console')