You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:16 UTC

[09/50] [abbrv] flink git commit: [FLINK-5810] [flip-6] Use single timeout task for SlotManager

[FLINK-5810] [flip-6] Use single timeout task for SlotManager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d75ec5b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d75ec5b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d75ec5b3

Branch: refs/heads/table-retraction
Commit: d75ec5b3551573d4eb1886c8e75dfdf6dc328da1
Parents: d16a5a2
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Apr 27 17:46:29 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Apr 28 15:28:26 2017 +0200

----------------------------------------------------------------------
 .../slotmanager/PendingSlotRequest.java         |  35 +--
 .../slotmanager/SlotManager.java                | 219 ++++++++------
 .../slotmanager/TaskManagerRegistration.java    |  38 +--
 .../clusterframework/ResourceManagerTest.java   |   8 +-
 .../slotmanager/SlotManagerTest.java            | 291 +++++++++----------
 .../slotmanager/SlotProtocolTest.java           |   2 -
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../runtime/testingUtils/TestingUtils.scala     |   4 +-
 8 files changed, 295 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
index 1195791..ffe1bfc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
@@ -27,8 +27,6 @@ import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
-import java.util.UUID;
-import java.util.concurrent.ScheduledFuture;
 
 public class PendingSlotRequest {
 
@@ -37,14 +35,12 @@ public class PendingSlotRequest {
 	@Nullable
 	private CompletableFuture<Acknowledge> requestFuture;
 
-	@Nullable
-	private UUID timeoutIdentifier;
-
-	@Nullable
-	private ScheduledFuture<?> timeoutFuture;
+	/** Timestamp when this pending slot request has been created. */
+	private final long creationTimestamp;
 
 	public PendingSlotRequest(SlotRequest slotRequest) {
 		this.slotRequest = Preconditions.checkNotNull(slotRequest);
+		creationTimestamp = System.currentTimeMillis();
 	}
 
 	// ------------------------------------------------------------------------
@@ -57,11 +53,6 @@ public class PendingSlotRequest {
 		return slotRequest.getResourceProfile();
 	}
 
-	@Nullable
-	public UUID getTimeoutIdentifier() {
-		return timeoutIdentifier;
-	}
-
 	public JobID getJobId() {
 		return slotRequest.getJobId();
 	}
@@ -70,6 +61,10 @@ public class PendingSlotRequest {
 		return slotRequest.getTargetAddress();
 	}
 
+	public long getCreationTimestamp() {
+		return creationTimestamp;
+	}
+
 	public boolean isAssigned() {
 		return null != requestFuture;
 	}
@@ -82,20 +77,4 @@ public class PendingSlotRequest {
 	public CompletableFuture<Acknowledge> getRequestFuture() {
 		return requestFuture;
 	}
-
-	public void cancelTimeout() {
-		if (timeoutFuture != null) {
-			timeoutFuture.cancel(true);
-
-			timeoutIdentifier = null;
-			timeoutFuture = null;
-		}
-	}
-
-	public void registerTimeout(ScheduledFuture<?> newTimeoutFuture, UUID newTimeoutIdentifier) {
-		cancelTimeout();
-
-		timeoutFuture = newTimeoutFuture;
-		timeoutIdentifier = newTimeoutIdentifier;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index f09b73a..829a06d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -64,11 +64,10 @@ import java.util.concurrent.TimeoutException;
  * {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
  *
  * In order to free resources and avoid resource leaks, idling task managers (task managers whose
- * slots are currently not used) and not fulfilled pending slot requests time out triggering their
- * release and failure, respectively.
+ * slots are currently not used) and pending slot requests time out triggering their release and
+ * failure, respectively.
  */
 public class SlotManager implements AutoCloseable {
-
 	private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);
 
 	/** Scheduled executor for timeouts */
@@ -107,6 +106,10 @@ public class SlotManager implements AutoCloseable {
 	/** Callbacks for resource (de-)allocations */
 	private ResourceManagerActions resourceManagerActions;
 
+	private ScheduledFuture<?> taskManagerTimeoutCheck;
+
+	private ScheduledFuture<?> slotRequestTimeoutCheck;
+
 	/** True iff the component has been started */
 	private boolean started;
 
@@ -128,6 +131,10 @@ public class SlotManager implements AutoCloseable {
 
 		leaderId = null;
 		resourceManagerActions = null;
+		mainThreadExecutor = null;
+		taskManagerTimeoutCheck = null;
+		slotRequestTimeoutCheck = null;
+
 		started = false;
 	}
 
@@ -142,17 +149,52 @@ public class SlotManager implements AutoCloseable {
 	 * @param newResourceManagerActions to use for resource (de-)allocations
 	 */
 	public void start(UUID newLeaderId, Executor newMainThreadExecutor, ResourceManagerActions newResourceManagerActions) {
+		LOG.info("Starting the SlotManager.");
+
 		leaderId = Preconditions.checkNotNull(newLeaderId);
 		mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
 		resourceManagerActions = Preconditions.checkNotNull(newResourceManagerActions);
 
 		started = true;
+
+		taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
+			@Override
+			public void run() {
+				mainThreadExecutor.execute(new Runnable() {
+					@Override
+					public void run() {
+						checkTaskManagerTimeouts();
+					}
+				});
+			}
+		}, 0L, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+		slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
+			@Override
+			public void run() {
+				mainThreadExecutor.execute(new Runnable() {
+					@Override
+					public void run() {
+						checkSlotRequestTimeouts();
+					}
+				});
+			}
+		}, 0L, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 	}
 
 	/**
 	 * Suspends the component. This clears the internal state of the slot manager.
 	 */
 	public void suspend() {
+		LOG.info("Suspending the SlotManager.");
+
+		// stop the timeout checks for the TaskManagers and the SlotRequests
+		taskManagerTimeoutCheck.cancel(false);
+		slotRequestTimeoutCheck.cancel(false);
+
+		taskManagerTimeoutCheck = null;
+		slotRequestTimeoutCheck = null;
+
 		for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
 			cancelPendingSlotRequest(pendingSlotRequest);
 		}
@@ -177,6 +219,8 @@ public class SlotManager implements AutoCloseable {
 	 */
 	@Override
 	public void close() throws Exception {
+		LOG.info("Closing the SlotManager.");
+
 		suspend();
 	}
 
@@ -249,6 +293,8 @@ public class SlotManager implements AutoCloseable {
 	public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
 		checkInit();
 
+		LOG.info("Register TaskManager {} at the SlotManager.", taskExecutorConnection.getInstanceID());
+
 		// we identify task managers by their instance id
 		if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
 			reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
@@ -272,8 +318,13 @@ public class SlotManager implements AutoCloseable {
 					taskExecutorConnection);
 			}
 
-			if (!anySlotUsed(taskManagerRegistration.getSlots())) {
-				registerTaskManagerTimeout(taskManagerRegistration);
+			// determine if the task manager is idle or not
+			boolean idle = !anySlotUsed(taskManagerRegistration.getSlots());
+
+			if (idle) {
+				taskManagerRegistration.markIdle();
+			} else {
+				taskManagerRegistration.markUsed();
 			}
 		}
 
@@ -292,9 +343,7 @@ public class SlotManager implements AutoCloseable {
 		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId);
 
 		if (null != taskManagerRegistration) {
-			removeSlots(taskManagerRegistration.getSlots());
-
-			taskManagerRegistration.cancelTimeout();
+			internalUnregisterTaskManager(taskManagerRegistration);
 
 			return true;
 		} else {
@@ -334,8 +383,11 @@ public class SlotManager implements AutoCloseable {
 			}
 
 			if (idle) {
-				// no slot of this task manager is being used --> register timer to free this resource
-				registerTaskManagerTimeout(taskManagerRegistration);
+				// no slot of this task manager is being used --> mark this task manager to be idle which allows it to
+				// time out
+				taskManagerRegistration.markIdle();
+			} else {
+				taskManagerRegistration.markUsed();
 			}
 
 			return true;
@@ -371,9 +423,14 @@ public class SlotManager implements AutoCloseable {
 
 					TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
 
-					if (null != taskManagerRegistration && !anySlotUsed(taskManagerRegistration.getSlots())) {
-						registerTaskManagerTimeout(taskManagerRegistration);
+					if (null != taskManagerRegistration) {
+						if (anySlotUsed(taskManagerRegistration.getSlots())) {
+							taskManagerRegistration.markUsed();
+						} else {
+							taskManagerRegistration.markIdle();
+						}
 					}
+
 				} else {
 					LOG.debug("Received request to free slot {} with expected allocation id {}, " +
 						"but actual allocation id {} differs. Ignoring the request.", slotId, allocationId, slot.getAllocationId());
@@ -524,8 +581,8 @@ public class SlotManager implements AutoCloseable {
 				TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
 
 				if (null != taskManagerRegistration) {
-					// disable any registered time out for the task manager
-					taskManagerRegistration.cancelTimeout();
+					// mark this TaskManager to be used to exempt it from timing out
+					taskManagerRegistration.markUsed();
 				}
 			}
 
@@ -551,24 +608,6 @@ public class SlotManager implements AutoCloseable {
 		if (taskManagerSlot != null) {
 			allocateSlot(taskManagerSlot, pendingSlotRequest);
 		} else {
-			final UUID timeoutIdentifier = UUID.randomUUID();
-			final AllocationID allocationId = pendingSlotRequest.getAllocationId();
-
-			// register timeout for slot request
-			ScheduledFuture<?> timeoutFuture = scheduledExecutor.schedule(new Runnable() {
-				@Override
-				public void run() {
-					mainThreadExecutor.execute(new Runnable() {
-						@Override
-						public void run() {
-							timeoutSlotRequest(allocationId, timeoutIdentifier);
-						}
-					});
-				}
-			}, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
-			pendingSlotRequest.registerTimeout(timeoutFuture, timeoutIdentifier);
-
 			resourceManagerActions.allocateResource(pendingSlotRequest.getResourceProfile());
 		}
 	}
@@ -591,6 +630,16 @@ public class SlotManager implements AutoCloseable {
 		taskManagerSlot.setAssignedSlotRequest(pendingSlotRequest);
 		pendingSlotRequest.setRequestFuture(completableFuture);
 
+		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(taskManagerSlot.getInstanceId());
+
+		if (taskManagerRegistration != null) {
+			// mark the task manager to be used since we have a pending slot request assigned ot one of its slots
+			taskManagerRegistration.markUsed();
+		} else {
+			throw new IllegalStateException("Could not find a registered task manager for instance id " +
+				taskManagerSlot.getInstanceId() + '.');
+		}
+
 		// RPC call to the task manager
 		Future<Acknowledge> requestFuture = gateway.requestSlot(
 			slotId,
@@ -717,7 +766,7 @@ public class SlotManager implements AutoCloseable {
 			TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(taskManagerSlot.getInstanceId());
 
 			if (null != taskManagerRegistration && !anySlotUsed(taskManagerRegistration.getSlots())) {
-				registerTaskManagerTimeout(taskManagerRegistration);
+				taskManagerRegistration.markIdle();
 			}
 		} else {
 			LOG.debug("There was no slot with {} registered. Probably this slot has been already freed.", slotId);
@@ -778,8 +827,6 @@ public class SlotManager implements AutoCloseable {
 	 * @param pendingSlotRequest to cancel
 	 */
 	private void cancelPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
-		pendingSlotRequest.cancelTimeout();
-
 		CompletableFuture<Acknowledge> request = pendingSlotRequest.getRequestFuture();
 
 		if (null != request) {
@@ -791,54 +838,50 @@ public class SlotManager implements AutoCloseable {
 	// Internal timeout methods
 	// ---------------------------------------------------------------------------------------------
 
-	private void timeoutTaskManager(InstanceID instanceId, UUID timeoutIdentifier) {
-		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId);
+	private void checkTaskManagerTimeouts() {
+		if (!taskManagerRegistrations.isEmpty()) {
+			long currentTime = System.currentTimeMillis();
+
+			Iterator<Map.Entry<InstanceID, TaskManagerRegistration>> taskManagerRegistrationIterator = taskManagerRegistrations.entrySet().iterator();
+
+			while (taskManagerRegistrationIterator.hasNext()) {
+				TaskManagerRegistration taskManagerRegistration = taskManagerRegistrationIterator.next().getValue();
 
-		if (null != taskManagerRegistration) {
-			if (Objects.equals(timeoutIdentifier, taskManagerRegistration.getTimeoutIdentifier())) {
 				if (anySlotUsed(taskManagerRegistration.getSlots())) {
-					LOG.debug("Cannot release the task manager with instance id {}, because some " +
-						"of its slots are still being used.", instanceId);
-				} else {
-					unregisterTaskManager(instanceId);
+					taskManagerRegistration.markUsed();
+				} else if (currentTime - taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) {
+					taskManagerRegistrationIterator.remove();
 
-					resourceManagerActions.releaseResource(instanceId);
-				}
-			} else {
-				taskManagerRegistrations.put(instanceId, taskManagerRegistration);
+					internalUnregisterTaskManager(taskManagerRegistration);
 
-				LOG.debug("Expected timeout identifier {} differs from the task manager's " +
-					"timeout identifier {}. Ignoring the task manager timeout call.",
-					timeoutIdentifier, taskManagerRegistration.getTimeoutIdentifier());
+					resourceManagerActions.releaseResource(taskManagerRegistration.getInstanceId());
+				}
 			}
-		} else {
-			LOG.debug("Could not find a registered task manager with instance id {}. Ignoring the task manager timeout call.", instanceId);
 		}
 	}
 
-	private void timeoutSlotRequest(AllocationID allocationId, UUID timeoutIdentifier) {
-		PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);
+	private void checkSlotRequestTimeouts() {
+		if (!pendingSlotRequests.isEmpty()) {
+			long currentTime = System.currentTimeMillis();
 
-		if (null != pendingSlotRequest) {
-			if (Objects.equals(timeoutIdentifier, pendingSlotRequest.getTimeoutIdentifier())) {
-				if (!pendingSlotRequest.isAssigned()) {
+			Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator();
+
+			while (slotRequestIterator.hasNext()) {
+				PendingSlotRequest slotRequest = slotRequestIterator.next().getValue();
+
+				if (currentTime - slotRequest.getCreationTimestamp() >= slotRequestTimeout.toMilliseconds()) {
+					slotRequestIterator.remove();
+
+					if (slotRequest.isAssigned()) {
+						cancelPendingSlotRequest(slotRequest);
+					}
 
 					resourceManagerActions.notifyAllocationFailure(
-						pendingSlotRequest.getJobId(),
-						allocationId,
+						slotRequest.getJobId(),
+						slotRequest.getAllocationId(),
 						new TimeoutException("The allocation could not be fulfilled in time."));
-				} else {
-					LOG.debug("Cannot fail pending slot request {} because it has been assigned.", allocationId);
 				}
-			} else {
-				pendingSlotRequests.put(allocationId, pendingSlotRequest);
-
-				LOG.debug("Expected timeout identifier {} differs from the pending slot request's " +
-					"timeout identifier {}. Ignoring the slot request timeout call.",
-					timeoutIdentifier, pendingSlotRequest.getTimeoutIdentifier());
 			}
-		} else {
-			LOG.debug("Could not find pending slot request with allocation id {}. Ignoring the slot request timeout call.", allocationId);
 		}
 	}
 
@@ -846,6 +889,12 @@ public class SlotManager implements AutoCloseable {
 	// Internal utility methods
 	// ---------------------------------------------------------------------------------------------
 
+	private void internalUnregisterTaskManager(TaskManagerRegistration taskManagerRegistration) {
+		Preconditions.checkNotNull(taskManagerRegistration);
+
+		removeSlots(taskManagerRegistration.getSlots());
+	}
+
 	private boolean checkDuplicateRequest(AllocationID allocationId) {
 		return pendingSlotRequests.containsKey(allocationId) || fulfilledSlotRequests.containsKey(allocationId);
 	}
@@ -853,38 +902,18 @@ public class SlotManager implements AutoCloseable {
 	private boolean anySlotUsed(Iterable<SlotID> slotsToCheck) {
 
 		if (null != slotsToCheck) {
-			boolean idle = true;
-
 			for (SlotID slotId : slotsToCheck) {
 				TaskManagerSlot taskManagerSlot = slots.get(slotId);
 
 				if (null != taskManagerSlot) {
-					idle &= taskManagerSlot.isFree();
+					if (taskManagerSlot.isAllocated()) {
+						return true;
+					}
 				}
 			}
-
-			return !idle;
-		} else {
-			return false;
 		}
-	}
-
-	private void registerTaskManagerTimeout(final TaskManagerRegistration taskManagerRegistration) {
-		final UUID timeoutIdentifier = UUID.randomUUID();
-
-		ScheduledFuture<?> timeoutFuture = scheduledExecutor.schedule(new Runnable() {
-			@Override
-			public void run() {
-				mainThreadExecutor.execute(new Runnable() {
-					@Override
-					public void run() {
-						timeoutTaskManager(taskManagerRegistration.getInstanceId(), timeoutIdentifier);
-					}
-				});
-			}
-		}, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
-		taskManagerRegistration.registerTimeout(timeoutFuture, timeoutIdentifier);
+		return false;
 	}
 
 	private void checkInit() {
@@ -911,11 +940,11 @@ public class SlotManager implements AutoCloseable {
 	}
 
 	@VisibleForTesting
-	boolean hasTimeoutRegistered(InstanceID instanceId) {
+	boolean isTaskManagerIdle(InstanceID instanceId) {
 		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
 
 		if (null != taskManagerRegistration) {
-			return taskManagerRegistration.getTimeoutIdentifier() != null;
+			return taskManagerRegistration.isIdle();
 		} else {
 			return false;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
index 3a15cb3..7d3764c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
@@ -25,8 +25,6 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.UUID;
-import java.util.concurrent.ScheduledFuture;
 
 public class TaskManagerRegistration {
 
@@ -34,9 +32,8 @@ public class TaskManagerRegistration {
 
 	private final HashSet<SlotID> slots;
 
-	private UUID timeoutIdentifier;
-
-	private ScheduledFuture<?> timeoutFuture;
+	/** Timestamp when the last time becoming idle. Otherwise Long.MAX_VALUE. */
+	private long idleSince;
 
 	public TaskManagerRegistration(
 		TaskExecutorConnection taskManagerConnection,
@@ -47,8 +44,7 @@ public class TaskManagerRegistration {
 
 		this.slots = new HashSet<>(slots);
 
-		timeoutIdentifier = null;
-		timeoutFuture = null;
+		idleSince = Long.MAX_VALUE;
 	}
 
 	public TaskExecutorConnection getTaskManagerConnection() {
@@ -59,31 +55,27 @@ public class TaskManagerRegistration {
 		return taskManagerConnection.getInstanceID();
 	}
 
-	public UUID getTimeoutIdentifier() {
-		return timeoutIdentifier;
-	}
-
 	public Iterable<SlotID> getSlots() {
 		return slots;
 	}
 
-	public boolean containsSlot(SlotID slotId) {
-		return slots.contains(slotId);
+	public long getIdleSince() {
+		return idleSince;
 	}
 
-	public void cancelTimeout() {
-		if (null != timeoutFuture) {
-			timeoutFuture.cancel(false);
+	public boolean isIdle() {
+		return idleSince != Long.MAX_VALUE;
+	}
 
-			timeoutFuture = null;
-			timeoutIdentifier = null;
-		}
+	public void markIdle() {
+		idleSince = System.currentTimeMillis();
 	}
 
-	public void registerTimeout(ScheduledFuture<?> newTimeoutFuture, UUID newTimeoutIdentifier) {
-		cancelTimeout();
+	public void markUsed() {
+		idleSince = Long.MAX_VALUE;
+	}
 
-		timeoutFuture = newTimeoutFuture;
-		timeoutIdentifier = newTimeoutIdentifier;
+	public boolean containsSlot(SlotID slotId) {
+		return slots.contains(slotId);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 41c2e16..c740518 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -488,13 +488,17 @@ public class ResourceManagerTest extends TestLogger {
 		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
 		final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
 
-		final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		final MetricRegistry metricRegistry = mock(MetricRegistry.class);
 		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
 			highAvailabilityServices,
 			rpcService.getScheduledExecutor(),
 			Time.minutes(5L));
 		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+		final SlotManager slotManager = new SlotManager(
+			TestingUtils.defaultScheduledExecutor(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime());
 
 		try {
 			final StandaloneResourceManager resourceManager = new StandaloneResourceManager(
@@ -504,7 +508,7 @@ public class ResourceManagerTest extends TestLogger {
 				resourceManagerConfiguration,
 				highAvailabilityServices,
 				heartbeatServices,
-				slotManagerFactory,
+				slotManager,
 				metricRegistry,
 				jobLeaderIdService,
 				testingFatalErrorHandler);

http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index fff2829..39c5f25 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -25,10 +25,9 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.*;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -38,7 +37,6 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
-import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -46,9 +44,11 @@ import org.mockito.ArgumentCaptor;
 
 import java.util.Arrays;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -648,7 +648,7 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testTaskManagerTimeout() throws Exception {
-		final long tmTimeout = 50L;
+		final long tmTimeout = 500L;
 
 		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 		final UUID leaderId = UUID.randomUUID();
@@ -661,7 +661,7 @@ public class SlotManagerTest extends TestLogger {
 		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
 		final SlotReport slotReport = new SlotReport(slotStatus);
 
-		final Executor mainThreadExecutor = mock(Executor.class);
+		final Executor mainThreadExecutor = TestingUtils.defaultExecutor();
 
 		try (SlotManager slotManager = new SlotManager(
 			TestingUtils.defaultScheduledExecutor(),
@@ -671,24 +671,21 @@ public class SlotManagerTest extends TestLogger {
 
 			slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
 
-			slotManager.registerTaskManager(taskManagerConnection, slotReport);
-
-			ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
-
-			verify(mainThreadExecutor, timeout(tmTimeout * 10L)).execute(runnableArgumentCaptor.capture());
+			mainThreadExecutor.execute(new Runnable() {
+				@Override
+				public void run() {
+					slotManager.registerTaskManager(taskManagerConnection, slotReport);
+				}
+			});
 
-			// the only runnable being executed by the main thread executor should be the timeout runnable
-			Runnable timeoutRunnable = runnableArgumentCaptor.getValue();
-
-			timeoutRunnable.run();
-
-			verify(resourceManagerActions, times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()));
+			verify(resourceManagerActions, timeout(100L * tmTimeout).times(1))
+				.releaseResource(eq(taskManagerConnection.getInstanceID()));
 		}
 	}
 
 	/**
 	 * Tests that slot requests time out after the specified request timeout. If a slot request
-	 * times out, then the request is cancelled, removed from the slot manager and the resourc
+	 * times out, then the request is cancelled, removed from the slot manager and the resource
 	 * manager is notified about the failed allocation.
 	 */
 	@Test
@@ -703,7 +700,7 @@ public class SlotManagerTest extends TestLogger {
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
 		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
 
-		final Executor mainThreadExecutor = mock(Executor.class);
+		final Executor mainThreadExecutor = TestingUtils.defaultExecutor();
 
 		try (SlotManager slotManager = new SlotManager(
 			TestingUtils.defaultScheduledExecutor(),
@@ -713,21 +710,27 @@ public class SlotManagerTest extends TestLogger {
 
 			slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
 
-			assertTrue(slotManager.registerSlotRequest(slotRequest));
-
-			ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
-
-			verify(mainThreadExecutor, timeout(allocationTimeout * 10L)).execute(runnableArgumentCaptor.capture());
-
-			// the only runnable being executed by the main thread executor should be the timeout runnable
-			Runnable timeoutRunnable = runnableArgumentCaptor.getValue();
+			final AtomicReference<Exception> atomicException = new AtomicReference<>(null);
 
-			timeoutRunnable.run();
+			mainThreadExecutor.execute(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						assertTrue(slotManager.registerSlotRequest(slotRequest));
+					} catch (Exception e) {
+						atomicException.compareAndSet(null, e);
+					}
+				}
+			});
 
-			verify(resourceManagerActions, times(1)).notifyAllocationFailure(
+			verify(resourceManagerActions, timeout(100L * allocationTimeout).times(1)).notifyAllocationFailure(
 				eq(jobId),
 				eq(allocationId),
 				any(TimeoutException.class));
+
+			if (atomicException.get() != null) {
+				throw atomicException.get();
+			}
 		}
 	}
 
@@ -815,6 +818,7 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testSlotReportWhileActiveSlotRequest() throws Exception {
+		final long verifyTimeout = 1000L;
 		final UUID leaderId = UUID.randomUUID();
 		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 
@@ -842,20 +846,37 @@ public class SlotManagerTest extends TestLogger {
 		final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
 		final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
 
-		// we have to manually trigger the future call backs to simulate the main thread executor behaviour
-		final Executor mainThreadExecutorMock = mock(Executor.class);
+		final Executor mainThreadExecutor = TestingUtils.defaultExecutor();
 
-		try (SlotManager slotManager = new SlotManager(
+		try (final SlotManager slotManager = new SlotManager(
 			TestingUtils.defaultScheduledExecutor(),
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime())) {
 
-			slotManager.start(leaderId, mainThreadExecutorMock, resourceManagerActions);
-
-			slotManager.registerTaskManager(taskManagerConnection, slotReport);
+			slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
 
-			slotManager.registerSlotRequest(slotRequest);
+			Future<Void> registrationFuture = FlinkFuture.supplyAsync(new Callable<Void>() {
+				@Override
+				public Void call() throws Exception {
+					slotManager.registerTaskManager(taskManagerConnection, slotReport);
+
+					return null;
+				}
+			}, mainThreadExecutor)
+			.thenAccept(new AcceptFunction<Void>() {
+				@Override
+				public void accept(Void value) {
+					try {
+						slotManager.registerSlotRequest(slotRequest);
+					} catch (SlotManagerException e) {
+						throw new RuntimeException("Could not register slots.", e);
+					}
+				}
+			});
+
+			// check that no exception has been thrown
+			registrationFuture.get();
 
 			ArgumentCaptor<SlotID> slotIdCaptor = ArgumentCaptor.forClass(SlotID.class);
 
@@ -867,26 +888,33 @@ public class SlotManagerTest extends TestLogger {
 				eq(leaderId),
 				any(Time.class));
 
-			final SlotID requestedSlotdId = slotIdCaptor.getValue();
-			final SlotID freeSlotId = requestedSlotdId.equals(slotId1) ? slotId2 : slotId1;
+			final SlotID requestedSlotId = slotIdCaptor.getValue();
+			final SlotID freeSlotId = requestedSlotId.equals(slotId1) ? slotId2 : slotId1;
+
+			Future<Boolean> freeSlotFuture = FlinkFuture.supplyAsync(new Callable<Boolean>() {
+				@Override
+				public Boolean call() throws Exception {
+					return slotManager.getSlot(freeSlotId).isFree();
+				}
+			}, mainThreadExecutor);
 
-			assertTrue(slotManager.getSlot(freeSlotId).isFree());
+			assertTrue(freeSlotFuture.get());
 
 			final SlotStatus newSlotStatus1 = new SlotStatus(slotIdCaptor.getValue(), resourceProfile, new JobID(), new AllocationID());
 			final SlotStatus newSlotStatus2 = new SlotStatus(freeSlotId, resourceProfile);
 			final SlotReport newSlotReport = new SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2));
 
-			// this should update the slot with the pending slot request triggering the reassignment of it
-			slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport);
+			FlinkFuture.supplyAsync(new Callable<Void>() {
+				@Override
+				public Void call() throws Exception {
+					// this should update the slot with the pending slot request triggering the reassignment of it
+					slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport);
 
-			ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
-			verify(mainThreadExecutorMock).execute(runnableArgumentCaptor.capture());
+					return null;
+				}
+			}, mainThreadExecutor);
 
-			Runnable requestFailureRunnable = runnableArgumentCaptor.getValue();
-
-			requestFailureRunnable.run();
-
-			verify(taskExecutorGateway, times(2)).requestSlot(
+			verify(taskExecutorGateway, timeout(verifyTimeout).times(2)).requestSlot(
 				slotIdCaptor.capture(),
 				eq(jobId),
 				eq(allocationId),
@@ -894,16 +922,18 @@ public class SlotManagerTest extends TestLogger {
 				eq(leaderId),
 				any(Time.class));
 
-			verify(mainThreadExecutorMock, times(2)).execute(runnableArgumentCaptor.capture());
-			Runnable requestSuccessRunnable = runnableArgumentCaptor.getValue();
+			final SlotID requestedSlotId2 = slotIdCaptor.getValue();
 
-			requestSuccessRunnable.run();
-
-			final SlotID requestedSlotId = slotIdCaptor.getValue();
+			assertEquals(slotId2, requestedSlotId2);
 
-			assertEquals(slotId2, requestedSlotId);
+			Future<TaskManagerSlot> requestedSlotFuture = FlinkFuture.supplyAsync(new Callable<TaskManagerSlot>() {
+				@Override
+				public TaskManagerSlot call() throws Exception {
+					return slotManager.getSlot(requestedSlotId2);
+				}
+			}, mainThreadExecutor);
 
-			TaskManagerSlot slot = slotManager.getSlot(requestedSlotId);
+			TaskManagerSlot slot = requestedSlotFuture.get();
 
 			assertTrue(slot.isAllocated());
 			assertEquals(allocationId, slot.getAllocationId());
@@ -916,11 +946,12 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testTimeoutForUnusedTaskManager() throws Exception {
-		final long taskManagerTimeout = 123456L;
+		final long taskManagerTimeout = 50L;
+		final long verifyTimeout = taskManagerTimeout * 10L;
 
 		final UUID leaderId = UUID.randomUUID();
 		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
-		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
+		final ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor();
 
 		final ResourceID resourceId = ResourceID.generate();
 
@@ -946,21 +977,34 @@ public class SlotManagerTest extends TestLogger {
 		final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
 		final SlotReport initialSlotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
 
-		try (SlotManager slotManager = new SlotManager(
+		final Executor mainThreadExecutor = TestingUtils.defaultExecutor();
+
+		try (final SlotManager slotManager = new SlotManager(
 			scheduledExecutor,
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime(),
 			Time.of(taskManagerTimeout, TimeUnit.MILLISECONDS))) {
 
-			slotManager.start(leaderId, Executors.directExecutor(), resourceManagerActions);
-
-			slotManager.registerSlotRequest(slotRequest);
+			slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
 
-			slotManager.registerTaskManager(taskManagerConnection, initialSlotReport);
+			FlinkFuture.supplyAsync(new Callable<Void>() {
+				@Override
+				public Void call() throws Exception {
+					slotManager.registerSlotRequest(slotRequest);
+
+					return null;
+				}
+			}, mainThreadExecutor)
+			.thenAccept(new AcceptFunction<Void>() {
+				@Override
+				public void accept(Void value) {
+					slotManager.registerTaskManager(taskManagerConnection, initialSlotReport);
+				}
+			});
 
 			ArgumentCaptor<SlotID> slotIdArgumentCaptor = ArgumentCaptor.forClass(SlotID.class);
 
-			verify(taskExecutorGateway).requestSlot(
+			verify(taskExecutorGateway, timeout(verifyTimeout)).requestSlot(
 				slotIdArgumentCaptor.capture(),
 				eq(jobId),
 				eq(allocationId),
@@ -968,103 +1012,48 @@ public class SlotManagerTest extends TestLogger {
 				eq(leaderId),
 				any(Time.class));
 
-			assertFalse(slotManager.hasTimeoutRegistered(taskManagerConnection.getInstanceID()));
-
-			SlotID slotId = slotIdArgumentCaptor.getValue();
-			TaskManagerSlot slot = slotManager.getSlot(slotId);
-
-			assertTrue(slot.isAllocated());
-			assertEquals(allocationId, slot.getAllocationId());
-
-			slotManager.freeSlot(slotId, allocationId);
+			Future<Boolean> idleFuture = FlinkFuture.supplyAsync(new Callable<Boolean>() {
+				@Override
+				public Boolean call() throws Exception {
+					return slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID());
+				}
+			}, mainThreadExecutor);
 
-			assertTrue(slotManager.hasTimeoutRegistered(taskManagerConnection.getInstanceID()));
+			// check that the TaskManaer is not idle
+			assertFalse(idleFuture.get());
 
-			ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
+			final SlotID slotId = slotIdArgumentCaptor.getValue();
 
-			// filter out the schedule call for the task manager which will be registered using the
-			// taskManagerTimeout value
-			verify(scheduledExecutor).schedule(runnableArgumentCaptor.capture(), eq(taskManagerTimeout), eq(TimeUnit.MILLISECONDS));
+			Future<TaskManagerSlot> slotFuture = FlinkFuture.supplyAsync(new Callable<TaskManagerSlot>() {
+				@Override
+				public TaskManagerSlot call() throws Exception {
+					return slotManager.getSlot(slotId);
+				}
+			}, mainThreadExecutor);
 
-			Runnable timeoutRunnable = runnableArgumentCaptor.getValue();
+			TaskManagerSlot slot = slotFuture.get();
 
-			timeoutRunnable.run();
-
-			verify(resourceManagerActions, times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()));
-		}
-	}
-
-	/**
-	 * Tests that the slot manager re-registers a timeout for a rejected slot request.
-	 */
-	@Test
-	public void testTimeoutForRejectedSlotRequest() throws Exception {
-
-		final long slotRequestTimeout = 1337L;
-		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
-
-		final ResourceID resourceId = ResourceID.generate();
-		final SlotID slotId = new SlotID(resourceId, 0);
-		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
-		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
-		final SlotReport slotReport = new SlotReport(slotStatus);
-
-		final UUID leaderId = UUID.randomUUID();
-		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
-
-		final JobID jobId = new JobID();
-		final AllocationID allocationId = new AllocationID();
-		final AllocationID allocationId2 = new AllocationID();
-		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
-
-		CompletableFuture<Acknowledge> requestFuture = new FlinkCompletableFuture<>();
-
-		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		when(taskExecutorGateway.requestSlot(
-			eq(slotId),
-			eq(jobId),
-			eq(allocationId),
-			anyString(),
-			eq(leaderId),
-			any(Time.class))).thenReturn(requestFuture);
-
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
-
-		try (SlotManager slotManager = new SlotManager(
-			scheduledExecutor,
-			TestingUtils.infiniteTime(),
-			Time.milliseconds(slotRequestTimeout),
-			TestingUtils.infiniteTime())) {
-
-			slotManager.start(leaderId, Executors.directExecutor(), resourceManagerActions);
-
-			slotManager.registerTaskManager(taskManagerConnection, slotReport);
-
-			slotManager.registerSlotRequest(slotRequest);
-
-			verify(taskExecutorGateway).requestSlot(
-				eq(slotId),
-				eq(jobId),
-				eq(allocationId),
-				anyString(),
-				eq(leaderId),
-				any(Time.class));
-
-			requestFuture.completeExceptionally(new SlotOccupiedException("Slot is already occupied", allocationId2));
-
-			ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
-			verify(scheduledExecutor).schedule(runnableArgumentCaptor.capture(), eq(slotRequestTimeout), eq(TimeUnit.MILLISECONDS));
+			assertTrue(slot.isAllocated());
+			assertEquals(allocationId, slot.getAllocationId());
 
-			Runnable timeoutRunnable = runnableArgumentCaptor.getValue();
+			Future<Boolean> idleFuture2 = FlinkFuture.supplyAsync(new Callable<Void>() {
+				@Override
+				public Void call() throws Exception {
+					slotManager.freeSlot(slotId, allocationId);
 
-			timeoutRunnable.run();
+					return null;
+				}
+			}, mainThreadExecutor)
+			.thenApply(new ApplyFunction<Void, Boolean>() {
+				@Override
+				public Boolean apply(Void value) {
+					return slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID());
+				}
+			});
 
-			verify(resourceManagerActions).notifyAllocationFailure(eq(jobId), eq(allocationId), any(Exception.class));
+			assertTrue(idleFuture2.get());
 
-			TaskManagerSlot slot = slotManager.getSlot(slotId);
-
-			assertTrue(slot.isAllocated());
-			assertEquals(allocationId2, slot.getAllocationId());
+			verify(resourceManagerActions, timeout(verifyTimeout).times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index c09316c..a1ab1ab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -76,7 +76,6 @@ public class SlotProtocolTest extends TestLogger {
 	@Test
 	public void testSlotsUnavailableRequest() throws Exception {
 		final JobID jobID = new JobID();
-		final ResourceID jmResourceId = new ResourceID(jmAddress);
 
 		final UUID rmLeaderID = UUID.randomUUID();
 
@@ -133,7 +132,6 @@ public class SlotProtocolTest extends TestLogger {
 	@Test
 	public void testSlotAvailableRequest() throws Exception {
 		final JobID jobID = new JobID();
-		final ResourceID jmResourceId = new ResourceID(jmAddress);
 
 		final UUID rmLeaderID = UUID.randomUUID();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 7ba1633..812a256 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=OFF, console
+log4j.rootLogger=DEBUG, console
 
 # -----------------------------------------------------------------------------
 # Console (use 'console')

http://git-wip-us.apache.org/repos/asf/flink/blob/d75ec5b3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 03b5172..876e26b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -82,7 +82,7 @@ object TestingUtils {
   def getDefaultTestingActorSystemConfig = testConfig
 
   def infiniteTime: Time = {
-    Time.milliseconds(Long.MaxValue);
+    Time.milliseconds(Integer.MAX_VALUE);
   }
   
 
@@ -113,7 +113,7 @@ object TestingUtils {
   def defaultExecutor: ScheduledExecutorService = {
     synchronized {
       if (sharedExecutorInstance == null || sharedExecutorInstance.isShutdown) {
-        sharedExecutorInstance = Executors.newSingleThreadScheduledExecutor()
+        sharedExecutorInstance = Executors.newSingleThreadScheduledExecutor();
       }
 
       sharedExecutorInstance