You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 15:33:35 UTC

[1/6] flink git commit: [FLINK-8120] [flip6] Register Yarn application with correct tracking URL

Repository: flink
Updated Branches:
  refs/heads/master e80dd8ea3 -> 331ce82c7


[FLINK-8120] [flip6] Register Yarn application with correct tracking URL

The cluster entrypoints start the ResourceManager with the web interface URL.
This URL is used to set the correct tracking URL in Yarn when registering the
Yarn application.

This closes #5128.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/627bcda6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/627bcda6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/627bcda6

Branch: refs/heads/master
Commit: 627bcda6957b2ad61b67b98a7d0a1de2c1f3eb29
Parents: e80dd8e
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Dec 6 18:01:58 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 14 14:32:23 2017 +0100

----------------------------------------------------------------------
 .../entrypoint/MesosJobClusterEntrypoint.java   | 17 +++--
 .../MesosSessionClusterEntrypoint.java          | 17 +++--
 .../entrypoint/JobClusterEntrypoint.java        |  6 +-
 .../entrypoint/SessionClusterEntrypoint.java    |  6 +-
 .../StandaloneSessionClusterEntrypoint.java     |  5 +-
 .../apache/flink/yarn/YarnResourceManager.java  | 67 +++++++++++++-------
 .../entrypoint/YarnJobClusterEntrypoint.java    |  8 ++-
 .../YarnSessionClusterEntrypoint.java           |  8 ++-
 .../flink/yarn/YarnResourceManagerTest.java     | 32 ++++++++--
 9 files changed, 113 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/627bcda6/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index 2fe99de..874ee14 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -53,6 +53,8 @@ import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -124,13 +126,14 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 
 	@Override
 	protected ResourceManager<?> createResourceManager(
-		Configuration configuration,
-		ResourceID resourceId,
-		RpcService rpcService,
-		HighAvailabilityServices highAvailabilityServices,
-		HeartbeatServices heartbeatServices,
-		MetricRegistry metricRegistry,
-		FatalErrorHandler fatalErrorHandler) throws Exception {
+			Configuration configuration,
+			ResourceID resourceId,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			MetricRegistry metricRegistry,
+			FatalErrorHandler fatalErrorHandler,
+			@Nullable String webInterfaceUrl) throws Exception {
 		final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
 		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
 		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(

http://git-wip-us.apache.org/repos/asf/flink/blob/627bcda6/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index b8d9f65..039e51d 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -51,6 +51,8 @@ import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 
+import javax.annotation.Nullable;
+
 /**
  * Entry point for Mesos session clusters.
  */
@@ -114,13 +116,14 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
 
 	@Override
 	protected ResourceManager<?> createResourceManager(
-		Configuration configuration,
-		ResourceID resourceId,
-		RpcService rpcService,
-		HighAvailabilityServices highAvailabilityServices,
-		HeartbeatServices heartbeatServices,
-		MetricRegistry metricRegistry,
-		FatalErrorHandler fatalErrorHandler) throws Exception {
+			Configuration configuration,
+			ResourceID resourceId,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			MetricRegistry metricRegistry,
+			FatalErrorHandler fatalErrorHandler,
+			@Nullable String webInterfaceUrl) throws Exception {
 		final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
 		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
 		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(

http://git-wip-us.apache.org/repos/asf/flink/blob/627bcda6/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index bd1f573..10ec659 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -97,7 +97,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			highAvailabilityServices,
 			heartbeatServices,
 			metricRegistry,
-			this);
+			this,
+			null);
 
 		jobManagerServices = JobManagerServices.fromConfiguration(configuration, blobServer);
 
@@ -272,7 +273,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 		HighAvailabilityServices highAvailabilityServices,
 		HeartbeatServices heartbeatServices,
 		MetricRegistry metricRegistry,
-		FatalErrorHandler fatalErrorHandler) throws Exception;
+		FatalErrorHandler fatalErrorHandler,
+		@Nullable String webInterfaceUrl) throws Exception;
 
 	protected abstract JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/627bcda6/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 27ddf49..0628281 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -120,7 +120,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			highAvailabilityServices,
 			heartbeatServices,
 			metricRegistry,
-			this);
+			this,
+			dispatcherRestEndpoint.getRestAddress());
 
 		dispatcher = createDispatcher(
 			configuration,
@@ -238,5 +239,6 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 		HighAvailabilityServices highAvailabilityServices,
 		HeartbeatServices heartbeatServices,
 		MetricRegistry metricRegistry,
-		FatalErrorHandler fatalErrorHandler) throws Exception;
+		FatalErrorHandler fatalErrorHandler,
+		@Nullable String webInterfaceUrl) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/627bcda6/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
index e7c9816..8d01196 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
@@ -35,6 +35,8 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 
+import javax.annotation.Nullable;
+
 /**
  * Entry point for the standalone session cluster.
  */
@@ -52,7 +54,8 @@ public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler) throws Exception {
+			FatalErrorHandler fatalErrorHandler,
+			@Nullable String webInterfaceUrl) throws Exception {
 		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
 		final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
 		final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(

http://git-wip-us.apache.org/repos/asf/flink/blob/627bcda6/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index c900c83..5a71f41 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -52,6 +52,8 @@ import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
+import javax.annotation.Nullable;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -100,6 +102,9 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 	private final YarnConfiguration yarnConfig;
 
+	@Nullable
+	private final String webInterfaceUrl;
+
 	/** Client to communicate with the Resource Manager (YARN's master). */
 	private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
 
@@ -123,7 +128,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 			SlotManager slotManager,
 			MetricRegistry metricRegistry,
 			JobLeaderIdService jobLeaderIdService,
-			FatalErrorHandler fatalErrorHandler) {
+			FatalErrorHandler fatalErrorHandler,
+			@Nullable String webInterfaceUrl) {
 		super(
 			rpcService,
 			resourceManagerEndpointId,
@@ -153,36 +159,50 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		}
 		yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
 		numPendingContainerRequests = 0;
+
+		this.webInterfaceUrl = webInterfaceUrl;
 	}
 
-	protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient() {
-		AMRMClientAsync<AMRMClient.ContainerRequest> rmc = AMRMClientAsync.createAMRMClientAsync(yarnHeartbeatIntervalMillis, this);
-		rmc.init(yarnConfig);
-		rmc.start();
-		try {
-			//TODO: change akka address to tcp host and port, the getAddress() interface should return a standard tcp address
-			Tuple2<String, Integer> hostPort = parseHostPort(getAddress());
-			//TODO: the third paramter should be the webmonitor address
-			rmc.registerApplicationMaster(hostPort.f0, hostPort.f1, getAddress());
-		} catch (Exception e) {
-			log.info("registerApplicationMaster fail", e);
-		}
-		return rmc;
+	protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(
+			YarnConfiguration yarnConfiguration,
+			int yarnHeartbeatIntervalMillis,
+			@Nullable String webInterfaceUrl) throws Exception {
+		AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(
+			yarnHeartbeatIntervalMillis,
+			this);
+
+		resourceManagerClient.init(yarnConfiguration);
+		resourceManagerClient.start();
+
+		//TODO: change akka address to tcp host and port, the getAddress() interface should return a standard tcp address
+		Tuple2<String, Integer> hostPort = parseHostPort(getAddress());
+
+		resourceManagerClient.registerApplicationMaster(hostPort.f0, hostPort.f1, webInterfaceUrl);
+
+		return resourceManagerClient;
 	}
 
-	protected NMClient createAndStartNodeManagerClient() {
+	protected NMClient createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration) {
 		// create the client to communicate with the node managers
-		NMClient nmc = NMClient.createNMClient();
-		nmc.init(yarnConfig);
-		nmc.start();
-		nmc.cleanupRunningContainersOnStop(true);
-		return nmc;
+		NMClient nodeManagerClient = NMClient.createNMClient();
+		nodeManagerClient.init(yarnConfiguration);
+		nodeManagerClient.start();
+		nodeManagerClient.cleanupRunningContainersOnStop(true);
+		return nodeManagerClient;
 	}
 
 	@Override
 	protected void initialize() throws ResourceManagerException {
-		resourceManagerClient = createAndStartResourceManagerClient();
-		nodeManagerClient = createAndStartNodeManagerClient();
+		try {
+			resourceManagerClient = createAndStartResourceManagerClient(
+				yarnConfig,
+				yarnHeartbeatIntervalMillis,
+				webInterfaceUrl);
+		} catch (Exception e) {
+			throw new ResourceManagerException("Could not start resource manager client.", e);
+		}
+
+		nodeManagerClient = createAndStartNodeManagerClient(yarnConfig);
 	}
 
 	@Override
@@ -222,7 +242,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 		// first, de-register from YARN
 		FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
-		log.info("Unregistering application from the YARN Resource Manager");
+		log.info("Unregister application from the YARN Resource Manager");
+
 		try {
 			resourceManagerClient.unregisterApplicationMaster(yarnStatus, optionalDiagnostics, "");
 		} catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/627bcda6/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index e1efb54..78013f7 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -41,6 +41,8 @@ import org.apache.flink.yarn.YarnResourceManager;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -79,7 +81,8 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler) throws Exception {
+			FatalErrorHandler fatalErrorHandler,
+			@Nullable String webInterfaceUrl) throws Exception {
 		final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
 		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
 		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
@@ -99,7 +102,8 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 			rmRuntimeServices.getSlotManager(),
 			metricRegistry,
 			rmRuntimeServices.getJobLeaderIdService(),
-			fatalErrorHandler);
+			fatalErrorHandler,
+			webInterfaceUrl);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/627bcda6/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index 042644b..96fba34 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -39,6 +39,8 @@ import org.apache.flink.yarn.YarnResourceManager;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Map;
 
@@ -69,7 +71,8 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler) throws Exception {
+			FatalErrorHandler fatalErrorHandler,
+			@Nullable String webInterfaceUrl) throws Exception {
 		final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
 		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
 		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
@@ -89,7 +92,8 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
 			rmRuntimeServices.getSlotManager(),
 			metricRegistry,
 			rmRuntimeServices.getJobLeaderIdService(),
-			fatalErrorHandler);
+			fatalErrorHandler,
+			webInterfaceUrl);
 	}
 
 	public static void main(String[] args) {

http://git-wip-us.apache.org/repos/asf/flink/blob/627bcda6/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 39ddfa7..252b3a8 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -73,6 +74,8 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
@@ -149,11 +152,23 @@ public class YarnResourceManagerTest extends TestLogger {
 				MetricRegistry metricRegistry,
 				JobLeaderIdService jobLeaderIdService,
 				FatalErrorHandler fatalErrorHandler,
+				@Nullable String webInterfaceUrl,
 				AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient,
 				NMClient mockNMClient) {
-			super(rpcService, resourceManagerEndpointId, resourceId, flinkConfig, env,
-					resourceManagerConfiguration, highAvailabilityServices, heartbeatServices,
-					slotManager, metricRegistry, jobLeaderIdService, fatalErrorHandler);
+			super(
+				rpcService,
+				resourceManagerEndpointId,
+				resourceId,
+				flinkConfig,
+				env,
+				resourceManagerConfiguration,
+				highAvailabilityServices,
+				heartbeatServices,
+				slotManager,
+				metricRegistry,
+				jobLeaderIdService,
+				fatalErrorHandler,
+				webInterfaceUrl);
 			this.mockNMClient = mockNMClient;
 			this.mockResourceManagerClient = mockResourceManagerClient;
 		}
@@ -167,12 +182,15 @@ public class YarnResourceManagerTest extends TestLogger {
 		}
 
 		@Override
-		protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient() {
+		protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(
+				YarnConfiguration yarnConfiguration,
+				int yarnHeartbeatIntervalMillis,
+				@Nullable String webInteraceUrl) {
 			return mockResourceManagerClient;
 		}
 
 		@Override
-		protected NMClient createAndStartNodeManagerClient() {
+		protected NMClient createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration) {
 			return mockNMClient;
 		}
 	}
@@ -231,9 +249,9 @@ public class YarnResourceManagerTest extends TestLogger {
 							rmServices.metricRegistry,
 							rmServices.jobLeaderIdService,
 							fatalErrorHandler,
+							null,
 							mockResourceManagerClient,
-							mockNMClient
-					);
+							mockNMClient);
 		}
 
 		/**


[6/6] flink git commit: [hotfix] Speed up RecoveryITCase

Posted by tr...@apache.org.
[hotfix] Speed up RecoveryITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/331ce82c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/331ce82c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/331ce82c

Branch: refs/heads/master
Commit: 331ce82c7fa3e58e0445c046db5e977455f3340e
Parents: 401d006
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Nov 24 23:29:53 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 14 14:32:25 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/RecoveryITCase.scala   | 16 +++++++---------
 1 file changed, 7 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/331ce82c/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index 4fc4042..71d2ee9 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
 import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType
@@ -61,9 +62,6 @@ class RecoveryITCase(_system: ActorSystem)
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers)
     config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, heartbeatTimeout)
-    config.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay")
-    config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1)
-    config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, heartbeatTimeout)
     new TestingCluster(config)
   }
 
@@ -86,12 +84,12 @@ class RecoveryITCase(_system: ActorSystem)
         ResultPartitionType.PIPELINED)
 
       val executionConfig = new ExecutionConfig()
-      executionConfig.setNumberOfExecutionRetries(1);
+      executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0))
 
       val jobGraph = new JobGraph("Pointwise job", sender, receiver)
       jobGraph.setExecutionConfig(executionConfig)
 
-      val cluster = createTestClusterWithHeartbeatTimeout(2 * NUM_TASKS, 1, "2 s")
+      val cluster = createTestClusterWithHeartbeatTimeout(2 * NUM_TASKS, 1, "100 ms")
       cluster.start()
 
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -135,12 +133,12 @@ class RecoveryITCase(_system: ActorSystem)
       receiver.setSlotSharingGroup(sharingGroup)
 
       val executionConfig = new ExecutionConfig()
-      executionConfig.setNumberOfExecutionRetries(1);
+      executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0))
 
       val jobGraph = new JobGraph("Pointwise job", sender, receiver)
       jobGraph.setExecutionConfig(executionConfig)
 
-      val cluster = createTestClusterWithHeartbeatTimeout(NUM_TASKS, 1, "2 s")
+      val cluster = createTestClusterWithHeartbeatTimeout(NUM_TASKS, 1, "100 ms")
       cluster.start()
 
       val jmGateway = cluster.getLeaderGateway(1 seconds)
@@ -184,12 +182,12 @@ class RecoveryITCase(_system: ActorSystem)
       receiver.setSlotSharingGroup(sharingGroup)
 
       val executionConfig = new ExecutionConfig()
-      executionConfig.setNumberOfExecutionRetries(1);
+      executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0))
 
       val jobGraph = new JobGraph("Pointwise job", sender, receiver)
       jobGraph.setExecutionConfig(executionConfig)
 
-      val cluster = createTestClusterWithHeartbeatTimeout(NUM_TASKS, 2, "2 s")
+      val cluster = createTestClusterWithHeartbeatTimeout(NUM_TASKS, 2, "100 ms")
       cluster.start()
 
       val jmGateway = cluster.getLeaderGateway(1 seconds)


[2/6] flink git commit: [FLINK-8087] Decouple Slot from AllocatedSlot

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 2e6558a..586f51b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.instance.LogicalSlot;
@@ -39,7 +38,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
@@ -285,7 +284,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(parallelism);
 		final TestingSlotOwner slotOwner = new TestingSlotOwner();
 		slotOwner.setReturnAllocatedSlotConsumer(
-			(Slot slot) -> returnedSlots.offer(slot.getAllocatedSlot().getSlotAllocationId()));
+			(Slot slot) -> returnedSlots.offer(slot.getSlotContext().getAllocationId()));
 
 		final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
 		final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
@@ -366,7 +365,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(2);
 		final TestingSlotOwner slotOwner = new TestingSlotOwner();
 		slotOwner.setReturnAllocatedSlotConsumer(
-			(Slot slot) -> returnedSlots.offer(slot.getAllocatedSlot().getSlotAllocationId()));
+			(Slot slot) -> returnedSlots.offer(slot.getSlotContext().getAllocationId()));
 
 		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
 		final SimpleSlot[] slots = new SimpleSlot[parallelism];
@@ -448,8 +447,11 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		TaskManagerLocation location = new TaskManagerLocation(
 				ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345);
 
-		AllocatedSlot slot = new AllocatedSlot(
-				new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, taskManager);
+		SimpleSlotContext slot = new SimpleSlotContext(
+			new AllocationID(),
+			location,
+			0,
+			taskManager);
 
 		return new SimpleSlot(slot, slotOwner, 0);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
index 4ce3f9d..3c8d994 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
@@ -111,28 +111,28 @@ public class ExecutionGraphStopTest extends TestLogger {
 
 		// deploy source 1
 		for (ExecutionVertex ev : eg.getJobVertex(source1.getID()).getTaskVertices()) {
-			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
+			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(sourceGateway);
 			ev.getCurrentExecutionAttempt().tryAssignResource(slot);
 			ev.getCurrentExecutionAttempt().deploy();
 		}
 
 		// deploy source 2
 		for (ExecutionVertex ev : eg.getJobVertex(source2.getID()).getTaskVertices()) {
-			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
+			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(sourceGateway);
 			ev.getCurrentExecutionAttempt().tryAssignResource(slot);
 			ev.getCurrentExecutionAttempt().deploy();
 		}
 
 		// deploy non-source 1
 		for (ExecutionVertex ev : eg.getJobVertex(nonSource1.getID()).getTaskVertices()) {
-			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
+			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(nonSourceGateway);
 			ev.getCurrentExecutionAttempt().tryAssignResource(slot);
 			ev.getCurrentExecutionAttempt().deploy();
 		}
 
 		// deploy non-source 2
 		for (ExecutionVertex ev : eg.getJobVertex(nonSource2.getID()).getTaskVertices()) {
-			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
+			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(nonSourceGateway);
 			ev.getCurrentExecutionAttempt().tryAssignResource(slot);
 			ev.getCurrentExecutionAttempt().deploy();
 		}
@@ -164,7 +164,7 @@ public class ExecutionGraphStopTest extends TestLogger {
 		when(gateway.stopTask(any(ExecutionAttemptID.class), any(Time.class)))
 				.thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
-		final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, gateway);
+		final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(gateway);
 
 		exec.tryAssignResource(slot);
 		exec.deploy();

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 42a63ec..06ffaa0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
@@ -49,7 +48,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -240,19 +239,20 @@ public class ExecutionGraphTestUtils {
 	//  Mocking Slots
 	// ------------------------------------------------------------------------
 
-	public static SimpleSlot createMockSimpleSlot(JobID jid, TaskManagerGateway gateway) {
+	public static SimpleSlot createMockSimpleSlot(TaskManagerGateway gateway) {
 		final TaskManagerLocation location = new TaskManagerLocation(
 				ResourceID.generate(), InetAddress.getLoopbackAddress(), 6572);
 
-		final AllocatedSlot allocatedSlot = new AllocatedSlot(
+		final SimpleSlotContext allocatedSlot = new SimpleSlotContext(
 				new AllocationID(),
-				jid,
 				location,
 				0,
-				ResourceProfile.UNKNOWN,
 				gateway);
 
-		return new SimpleSlot(allocatedSlot, mock(SlotOwner.class), 0);
+		return new SimpleSlot(
+			allocatedSlot,
+			mock(SlotOwner.class),
+			0);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index c6fb836..71d6f51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -81,7 +81,6 @@ public class ExecutionTest extends TestLogger {
 		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
 
 		final SimpleSlot slot = new SimpleSlot(
-			new JobID(),
 			slotOwner,
 			new LocalTaskManagerLocation(),
 			0,
@@ -121,7 +120,6 @@ public class ExecutionTest extends TestLogger {
 		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
 
 		final SimpleSlot slot = new SimpleSlot(
-			new JobID(),
 			slotOwner,
 			new LocalTaskManagerLocation(),
 			0,
@@ -171,7 +169,6 @@ public class ExecutionTest extends TestLogger {
 		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
 
 		final SimpleSlot slot = new SimpleSlot(
-			new JobID(),
 			slotOwner,
 			new LocalTaskManagerLocation(),
 			0,
@@ -285,11 +282,12 @@ public class ExecutionTest extends TestLogger {
 		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
 
 		final SimpleSlot slot = new SimpleSlot(
-			new JobID(),
 			slotOwner,
 			new LocalTaskManagerLocation(),
 			0,
-			new SimpleAckingTaskManagerGateway());
+			new SimpleAckingTaskManagerGateway(),
+			null,
+			null);
 
 		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
 		slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 1b8daca..cd613f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -18,36 +18,42 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-import java.io.IOException;
-
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.BaseTestingActorGateway;
 import org.apache.flink.runtime.instance.DummyActorGateway;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
+import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.io.IOException;
+
 import scala.concurrent.ExecutionContext;
 
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexResource;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
 @SuppressWarnings("serial")
 public class ExecutionVertexCancelTest extends TestLogger {
 
@@ -134,7 +140,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 					executionContext, 2);
 
 			Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot();
 
 			vertex.deployToSlot(slot);
 
@@ -202,7 +208,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 					2);
 
 			Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot();
 
 			vertex.deployToSlot(slot);
 
@@ -262,7 +268,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 					1);
 
 			Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot();
 
 			setVertexResource(vertex, slot);
 			setVertexState(vertex, ExecutionState.RUNNING);
@@ -302,7 +308,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 					1);
 
 			Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot();
 
 			setVertexResource(vertex, slot);
 			setVertexState(vertex, ExecutionState.RUNNING);
@@ -350,7 +356,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 					1);
 
 			Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot();
 
 			setVertexResource(vertex, slot);
 			setVertexState(vertex, ExecutionState.RUNNING);
@@ -383,7 +389,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			final ActorGateway gateway = new CancelSequenceActorGateway(TestingUtils.directExecutionContext(), 0);
 
 			Instance instance = getInstance(new ActorTaskManagerGateway(gateway));
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot();
 
 			setVertexResource(vertex, slot);
 			setVertexState(vertex, ExecutionState.RUNNING);
@@ -458,7 +464,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			// the scheduler (or any caller) needs to know that the slot should be released
 			try {
 				Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
-				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+				SimpleSlot slot = instance.allocateSimpleSlot();
 
 				vertex.deployToSlot(slot);
 				fail("Method should throw an exception");
@@ -501,7 +507,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 				setVertexState(vertex, ExecutionState.CANCELING);
 
 				Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
-				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+				SimpleSlot slot = instance.allocateSimpleSlot();
 
 				vertex.deployToSlot(slot);
 				fail("Method should throw an exception");
@@ -517,7 +523,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 						AkkaUtils.getDefaultTimeout());
 
 				Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
-				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+				SimpleSlot slot = instance.allocateSimpleSlot();
 
 				setVertexResource(vertex, slot);
 				setVertexState(vertex, ExecutionState.CANCELING);

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 973c7d4..7f97d12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.TestLogger;
@@ -67,7 +67,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 			Instance instance = getInstance(
 				new ActorTaskManagerGateway(
 					new SimpleActorGateway(TestingUtils.directExecutionContext())));
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 				AkkaUtils.getDefaultTimeout());
@@ -104,7 +104,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 			final Instance instance = getInstance(
 				new ActorTaskManagerGateway(
 					new SimpleActorGateway(TestingUtils.directExecutionContext())));
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 				AkkaUtils.getDefaultTimeout());
@@ -146,7 +146,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 			final Instance instance = getInstance(
 				new ActorTaskManagerGateway(
 					new SimpleActorGateway(TestingUtils.defaultExecutionContext())));
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
@@ -191,7 +191,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 			final Instance instance = getInstance(
 				new ActorTaskManagerGateway(
 					new SimpleFailingActorGateway(TestingUtils.directExecutionContext())));
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
@@ -221,7 +221,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 			final Instance instance = getInstance(
 				new ActorTaskManagerGateway(
 					new SimpleFailingActorGateway(TestingUtils.directExecutionContext())));
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
@@ -265,7 +265,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 			final Instance instance = getInstance(
 				new ActorTaskManagerGateway(
 					new SimpleActorGateway(TestingUtils.directExecutionContext())));
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			vertex.deployToSlot(slot);
@@ -310,7 +310,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 						context,
 						2)));
 
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
@@ -372,7 +372,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 		result.getPartitions()[0].addConsumer(mockEdge, 0);
 
 		AllocatedSlot allocatedSlot = mock(AllocatedSlot.class);
-		when(allocatedSlot.getSlotAllocationId()).thenReturn(new AllocationID());
+		when(allocatedSlot.getAllocationId()).thenReturn(new AllocationID());
 
 		LogicalSlot slot = mock(LogicalSlot.class);
 		when(slot.getAllocationId()).thenReturn(new AllocationID());

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 15d021a..98f7259 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -37,7 +36,8 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
+import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -233,8 +233,8 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 		//  - mocking the scheduler created fragile tests that break whenever the scheduler is adjusted
 		//  - exposing test methods in the ExecutionVertex leads to undesirable setters 
 
-		AllocatedSlot slot = new AllocatedSlot(
-				new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, mock(TaskManagerGateway.class));
+		SlotContext slot = new SimpleSlotContext(
+				new AllocationID(), location, 0, mock(TaskManagerGateway.class));
 
 		SimpleSlot simpleSlot = new SimpleSlot(slot, mock(SlotOwner.class), 0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 2941739..9310912 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -57,7 +57,7 @@ public class ExecutionVertexSchedulingTest {
 
 			// a slot than cannot be deployed to
 			final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 			
 			slot.releaseInstanceSlot();
 			assertTrue(slot.isReleased());
@@ -89,7 +89,7 @@ public class ExecutionVertexSchedulingTest {
 
 			// a slot than cannot be deployed to
 			final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			slot.releaseInstanceSlot();
 			assertTrue(slot.isReleased());
@@ -126,7 +126,7 @@ public class ExecutionVertexSchedulingTest {
 
 			final Instance instance = getInstance(new ActorTaskManagerGateway(
 				new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.defaultExecutionContext())));
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			Scheduler scheduler = mock(Scheduler.class);
 			CompletableFuture<SimpleSlot> future = new CompletableFuture<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index 14e0e66..9a19d24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph.utils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -29,7 +28,8 @@ import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
+import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -47,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 
-	private final ArrayDeque<AllocatedSlot> slots;
+	private final ArrayDeque<SlotContext> slots;
 
 	public SimpleSlotProvider(JobID jobId, int numSlots) {
 		this(jobId, numSlots, new SimpleAckingTaskManagerGateway());
@@ -60,12 +60,10 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 		this.slots = new ArrayDeque<>(numSlots);
 
 		for (int i = 0; i < numSlots; i++) {
-			AllocatedSlot as = new AllocatedSlot(
+			SimpleSlotContext as = new SimpleSlotContext(
 					new AllocationID(),
-					jobId,
 					new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i),
 					0,
-					ResourceProfile.UNKNOWN,
 					taskManagerGateway);
 			slots.add(as);
 		}
@@ -76,7 +74,7 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 			ScheduledUnit task,
 			boolean allowQueued,
 			Collection<TaskManagerLocation> preferredLocations) {
-		final AllocatedSlot slot;
+		final SlotContext slot;
 
 		synchronized (slots) {
 			if (slots.isEmpty()) {
@@ -98,7 +96,7 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 	@Override
 	public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
 		synchronized (slots) {
-			slots.add(slot.getAllocatedSlot());
+			slots.add(slot.getSlotContext());
 		}
 		return CompletableFuture.completedFuture(true);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
index 0e4bfc0..bc396c1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
@@ -20,7 +20,12 @@ package org.apache.flink.runtime.instance;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
@@ -28,10 +33,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
-public class AllocatedSlotsTest {
+public class AllocatedSlotsTest extends TestLogger {
 
 	@Test
 	public void testOperations() throws Exception {
@@ -39,12 +42,13 @@ public class AllocatedSlotsTest {
 
 		final AllocationID allocation1 = new AllocationID();
 		final SlotPoolGateway.SlotRequestID slotRequestID = new SlotPoolGateway.SlotRequestID();
-		final ResourceID resource1 = new ResourceID("resource1");
-		final Slot slot1 = createSlot(resource1, allocation1);
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+		final ResourceID resource1 = taskManagerLocation.getResourceID();
+		final AllocatedSlot slot1 = createSlot(allocation1, taskManagerLocation);
 
 		allocatedSlots.add(slotRequestID, slot1);
 
-		assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
+		assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
 		assertTrue(allocatedSlots.containResource(resource1));
 
 		assertEquals(slot1, allocatedSlots.get(allocation1));
@@ -53,12 +57,12 @@ public class AllocatedSlotsTest {
 
 		final AllocationID allocation2 = new AllocationID();
 		final SlotPoolGateway.SlotRequestID slotRequestID2 = new SlotPoolGateway.SlotRequestID();
-		final Slot slot2 = createSlot(resource1, allocation2);
+		final AllocatedSlot slot2 = createSlot(allocation2, taskManagerLocation);
 
 		allocatedSlots.add(slotRequestID2, slot2);
 
-		assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
-		assertTrue(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
+		assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
+		assertTrue(allocatedSlots.contains(slot2.getAllocationId()));
 		assertTrue(allocatedSlots.containResource(resource1));
 
 		assertEquals(slot1, allocatedSlots.get(allocation1));
@@ -68,14 +72,15 @@ public class AllocatedSlotsTest {
 
 		final AllocationID allocation3 = new AllocationID();
 		final SlotPoolGateway.SlotRequestID slotRequestID3 = new SlotPoolGateway.SlotRequestID();
-		final ResourceID resource2 = new ResourceID("resource2");
-		final Slot slot3 = createSlot(resource2, allocation3);
+		final TaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation();
+		final ResourceID resource2 = taskManagerLocation2.getResourceID();
+		final AllocatedSlot slot3 = createSlot(allocation3, taskManagerLocation2);
 
 		allocatedSlots.add(slotRequestID3, slot3);
 
-		assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
-		assertTrue(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
-		assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+		assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
+		assertTrue(allocatedSlots.contains(slot2.getAllocationId()));
+		assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
 		assertTrue(allocatedSlots.containResource(resource1));
 		assertTrue(allocatedSlots.containResource(resource2));
 
@@ -86,11 +91,11 @@ public class AllocatedSlotsTest {
 		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
 		assertEquals(3, allocatedSlots.size());
 
-		allocatedSlots.remove(slot2);
+		allocatedSlots.remove(slot2.getAllocationId());
 
-		assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
-		assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
-		assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+		assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
+		assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
+		assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
 		assertTrue(allocatedSlots.containResource(resource1));
 		assertTrue(allocatedSlots.containResource(resource2));
 
@@ -101,11 +106,11 @@ public class AllocatedSlotsTest {
 		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
 		assertEquals(2, allocatedSlots.size());
 
-		allocatedSlots.remove(slot1);
+		allocatedSlots.remove(slot1.getAllocationId());
 
-		assertFalse(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
-		assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
-		assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+		assertFalse(allocatedSlots.contains(slot1.getAllocationId()));
+		assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
+		assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
 		assertFalse(allocatedSlots.containResource(resource1));
 		assertTrue(allocatedSlots.containResource(resource2));
 
@@ -116,11 +121,11 @@ public class AllocatedSlotsTest {
 		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
 		assertEquals(1, allocatedSlots.size());
 
-		allocatedSlots.remove(slot3);
+		allocatedSlots.remove(slot3.getAllocationId());
 
-		assertFalse(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
-		assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
-		assertFalse(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+		assertFalse(allocatedSlots.contains(slot1.getAllocationId()));
+		assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
+		assertFalse(allocatedSlots.contains(slot3.getAllocationId()));
 		assertFalse(allocatedSlots.containResource(resource1));
 		assertFalse(allocatedSlots.containResource(resource2));
 
@@ -132,13 +137,13 @@ public class AllocatedSlotsTest {
 		assertEquals(0, allocatedSlots.size());
 	}
 
-	private Slot createSlot(final ResourceID resourceId, final AllocationID allocationId) {
-		AllocatedSlot mockAllocatedSlot = mock(AllocatedSlot.class);
-		Slot slot = mock(Slot.class);
-		when(slot.getTaskManagerID()).thenReturn(resourceId);
-		when(slot.getAllocatedSlot()).thenReturn(mockAllocatedSlot);
-
-		when(mockAllocatedSlot.getSlotAllocationId()).thenReturn(allocationId);
-		return slot;
+	private AllocatedSlot createSlot(final AllocationID allocationId, final TaskManagerLocation taskManagerLocation) {
+		return new AllocatedSlot(
+			allocationId,
+			taskManagerLocation,
+			0,
+			ResourceProfile.UNKNOWN,
+			new SimpleAckingTaskManagerGateway(),
+			new DummySlotOwner());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
index 4ed88c4..9ede899 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
@@ -18,14 +18,15 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -35,7 +36,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class AvailableSlotsTest {
+public class AvailableSlotsTest extends TestLogger {
 
 	static final ResourceProfile DEFAULT_TESTING_PROFILE = new ResourceProfile(1.0, 512);
 
@@ -57,27 +58,27 @@ public class AvailableSlotsTest {
 		availableSlots.add(slot3, 3L);
 
 		assertEquals(3, availableSlots.size());
-		assertTrue(availableSlots.contains(slot1.getSlotAllocationId()));
-		assertTrue(availableSlots.contains(slot2.getSlotAllocationId()));
-		assertTrue(availableSlots.contains(slot3.getSlotAllocationId()));
+		assertTrue(availableSlots.contains(slot1.getAllocationId()));
+		assertTrue(availableSlots.contains(slot2.getAllocationId()));
+		assertTrue(availableSlots.contains(slot3.getAllocationId()));
 		assertTrue(availableSlots.containsTaskManager(resource1));
 		assertTrue(availableSlots.containsTaskManager(resource2));
 
 		availableSlots.removeAllForTaskManager(resource1);
 
 		assertEquals(1, availableSlots.size());
-		assertFalse(availableSlots.contains(slot1.getSlotAllocationId()));
-		assertFalse(availableSlots.contains(slot2.getSlotAllocationId()));
-		assertTrue(availableSlots.contains(slot3.getSlotAllocationId()));
+		assertFalse(availableSlots.contains(slot1.getAllocationId()));
+		assertFalse(availableSlots.contains(slot2.getAllocationId()));
+		assertTrue(availableSlots.contains(slot3.getAllocationId()));
 		assertFalse(availableSlots.containsTaskManager(resource1));
 		assertTrue(availableSlots.containsTaskManager(resource2));
 
 		availableSlots.removeAllForTaskManager(resource2);
 
 		assertEquals(0, availableSlots.size());
-		assertFalse(availableSlots.contains(slot1.getSlotAllocationId()));
-		assertFalse(availableSlots.contains(slot2.getSlotAllocationId()));
-		assertFalse(availableSlots.contains(slot3.getSlotAllocationId()));
+		assertFalse(availableSlots.contains(slot1.getAllocationId()));
+		assertFalse(availableSlots.contains(slot2.getAllocationId()));
+		assertFalse(availableSlots.contains(slot3.getAllocationId()));
 		assertFalse(availableSlots.containsTaskManager(resource1));
 		assertFalse(availableSlots.containsTaskManager(resource2));
 	}
@@ -92,7 +93,7 @@ public class AvailableSlotsTest {
 		availableSlots.add(slot1, 1L);
 
 		assertEquals(1, availableSlots.size());
-		assertTrue(availableSlots.contains(slot1.getSlotAllocationId()));
+		assertTrue(availableSlots.contains(slot1.getAllocationId()));
 		assertTrue(availableSlots.containsTaskManager(resource1));
 
 		assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE, null));
@@ -100,7 +101,7 @@ public class AvailableSlotsTest {
 		SlotAndLocality slotAndLocality = availableSlots.poll(DEFAULT_TESTING_PROFILE, null);
 		assertEquals(slot1, slotAndLocality.slot());
 		assertEquals(0, availableSlots.size());
-		assertFalse(availableSlots.contains(slot1.getSlotAllocationId()));
+		assertFalse(availableSlots.contains(slot1.getAllocationId()));
 		assertFalse(availableSlots.containsTaskManager(resource1));
 	}
 
@@ -112,10 +113,10 @@ public class AvailableSlotsTest {
 
 		return new AllocatedSlot(
 			new AllocationID(),
-			new JobID(),
 			mockTaskManagerLocation,
 			0,
 			DEFAULT_TESTING_PROFILE,
-			mockTaskManagerGateway);
+			mockTaskManagerGateway,
+			new DummySlotOwner());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index 5b85f72..229237d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -18,17 +18,22 @@
 
 package org.apache.flink.runtime.instance;
 
-import static org.junit.Assert.*;
-
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
 import org.junit.Test;
 
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 /**
  * Tests for the {@link Instance} class.
  */
@@ -53,10 +58,10 @@ public class InstanceTest {
 			assertEquals(4, instance.getNumberOfAvailableSlots());
 			assertEquals(0, instance.getNumberOfAllocatedSlots());
 
-			SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
-			SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
-			SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
-			SimpleSlot slot4 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot1 = instance.allocateSimpleSlot();
+			SimpleSlot slot2 = instance.allocateSimpleSlot();
+			SimpleSlot slot3 = instance.allocateSimpleSlot();
+			SimpleSlot slot4 = instance.allocateSimpleSlot();
 
 			assertNotNull(slot1);
 			assertNotNull(slot2);
@@ -69,7 +74,7 @@ public class InstanceTest {
 					slot3.getSlotNumber() + slot4.getSlotNumber());
 
 			// no more slots
-			assertNull(instance.allocateSimpleSlot(new JobID()));
+			assertNull(instance.allocateSimpleSlot());
 			try {
 				instance.returnAllocatedSlot(slot2);
 				fail("instance accepted a non-cancelled slot.");
@@ -118,9 +123,9 @@ public class InstanceTest {
 
 			assertEquals(3, instance.getNumberOfAvailableSlots());
 
-			SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
-			SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
-			SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot1 = instance.allocateSimpleSlot();
+			SimpleSlot slot2 = instance.allocateSimpleSlot();
+			SimpleSlot slot3 = instance.allocateSimpleSlot();
 
 			instance.markDead();
 
@@ -154,9 +159,9 @@ public class InstanceTest {
 
 			assertEquals(3, instance.getNumberOfAvailableSlots());
 
-			SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
-			SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
-			SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot1 = instance.allocateSimpleSlot();
+			SimpleSlot slot2 = instance.allocateSimpleSlot();
+			SimpleSlot slot3 = instance.allocateSimpleSlot();
 
 			instance.cancelAndReleaseAllSlots();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
index 1e2b6af..5104e48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
@@ -34,7 +33,12 @@ import org.junit.Test;
 
 import java.util.Collections;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the allocation, properties, and release of shared slots.
@@ -46,7 +50,6 @@ public class SharedSlotsTest extends TestLogger {
 	@Test
 	public void allocateAndReleaseEmptySlot() {
 		try {
-			JobID jobId = new JobID();
 			JobVertexID vertexId = new JobVertexID();
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(vertexId);
@@ -62,7 +65,7 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(2, instance.getNumberOfAvailableSlots());
 			
 			// allocate a shared slot
-			SharedSlot slot = instance.allocateSharedSlot(jobId, assignment);
+			SharedSlot slot = instance.allocateSharedSlot(assignment);
 			assertEquals(2, instance.getTotalNumberOfSlots());
 			assertEquals(1, instance.getNumberOfAllocatedSlots());
 			assertEquals(1, instance.getNumberOfAvailableSlots());
@@ -110,7 +113,6 @@ public class SharedSlotsTest extends TestLogger {
 	@Test
 	public void allocateSimpleSlotsAndReleaseFromRoot() {
 		try {
-			JobID jobId = new JobID();
 			JobVertexID vid1 = new JobVertexID();
 			JobVertexID vid2 = new JobVertexID();
 			JobVertexID vid3 = new JobVertexID();
@@ -122,7 +124,7 @@ public class SharedSlotsTest extends TestLogger {
 			Instance instance = SchedulerTestUtils.getRandomInstance(1);
 
 			// allocate a shared slot
-			SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
+			SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
 
 			// allocate a series of sub slots
 
@@ -134,7 +136,6 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(1, sub1.getNumberLeaves());
 			assertEquals(vid1, sub1.getGroupID());
 			assertEquals(instance.getTaskManagerID(), sub1.getTaskManagerID());
-			assertEquals(jobId, sub1.getJobID());
 			assertEquals(sharedSlot, sub1.getParent());
 			assertEquals(sharedSlot, sub1.getRoot());
 			assertEquals(0, sub1.getRootSlotNumber());
@@ -153,7 +154,6 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(1, sub2.getNumberLeaves());
 			assertEquals(vid2, sub2.getGroupID());
 			assertEquals(instance.getTaskManagerID(), sub2.getTaskManagerID());
-			assertEquals(jobId, sub2.getJobID());
 			assertEquals(sharedSlot, sub2.getParent());
 			assertEquals(sharedSlot, sub2.getRoot());
 			assertEquals(0, sub2.getRootSlotNumber());
@@ -172,7 +172,6 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(1, sub3.getNumberLeaves());
 			assertEquals(vid3, sub3.getGroupID());
 			assertEquals(instance.getTaskManagerID(), sub3.getTaskManagerID());
-			assertEquals(jobId, sub3.getJobID());
 			assertEquals(sharedSlot, sub3.getParent());
 			assertEquals(sharedSlot, sub3.getRoot());
 			assertEquals(0, sub3.getRootSlotNumber());
@@ -192,7 +191,6 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(1, sub4.getNumberLeaves());
 			assertEquals(vid4, sub4.getGroupID());
 			assertEquals(instance.getTaskManagerID(), sub4.getTaskManagerID());
-			assertEquals(jobId, sub4.getJobID());
 			assertEquals(sharedSlot, sub4.getParent());
 			assertEquals(sharedSlot, sub4.getRoot());
 			assertEquals(0, sub4.getRootSlotNumber());
@@ -235,7 +233,6 @@ public class SharedSlotsTest extends TestLogger {
 	@Test
 	public void allocateSimpleSlotsAndReleaseFromLeaves() {
 		try {
-			JobID jobId = new JobID();
 			JobVertexID vid1 = new JobVertexID();
 			JobVertexID vid2 = new JobVertexID();
 			JobVertexID vid3 = new JobVertexID();
@@ -246,7 +243,7 @@ public class SharedSlotsTest extends TestLogger {
 			Instance instance = SchedulerTestUtils.getRandomInstance(1);
 
 			// allocate a shared slot
-			SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
+			SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
 
 			// allocate a series of sub slots
 
@@ -320,7 +317,6 @@ public class SharedSlotsTest extends TestLogger {
 	@Test
 	public void allocateAndReleaseInMixedOrder() {
 		try {
-			JobID jobId = new JobID();
 			JobVertexID vid1 = new JobVertexID();
 			JobVertexID vid2 = new JobVertexID();
 			JobVertexID vid3 = new JobVertexID();
@@ -331,7 +327,7 @@ public class SharedSlotsTest extends TestLogger {
 			Instance instance = SchedulerTestUtils.getRandomInstance(1);
 
 			// allocate a shared slot
-			SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
+			SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
 
 			// allocate a series of sub slots
 
@@ -427,7 +423,7 @@ public class SharedSlotsTest extends TestLogger {
 			Instance instance = SchedulerTestUtils.getRandomInstance(1);
 			
 			// allocate a shared slot
-			SharedSlot sharedSlot = instance.allocateSharedSlot(new JobID(), assignment);
+			SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
 			
 			// get the first simple slot
 			SimpleSlot sourceSlot = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, sourceId);
@@ -563,7 +559,7 @@ public class SharedSlotsTest extends TestLogger {
 			Instance instance = SchedulerTestUtils.getRandomInstance(1);
 
 			// allocate a shared slot
-			SharedSlot sharedSlot = instance.allocateSharedSlot(new JobID(), assignment);
+			SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
 
 			// get the first simple slot
 			SimpleSlot sourceSlot = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, sourceId);
@@ -607,7 +603,6 @@ public class SharedSlotsTest extends TestLogger {
 	@Test
 	public void testImmediateReleaseOneLevel() {
 		try {
-			JobID jobId = new JobID();
 			JobVertexID vid = new JobVertexID();
 
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(vid);
@@ -615,7 +610,7 @@ public class SharedSlotsTest extends TestLogger {
 
 			Instance instance = SchedulerTestUtils.getRandomInstance(1);
 			
-			SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
+			SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
 
 			SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, vid);
 			sub.releaseInstanceSlot();
@@ -635,7 +630,6 @@ public class SharedSlotsTest extends TestLogger {
 	@Test
 	public void testImmediateReleaseTwoLevel() {
 		try {
-			JobID jobId = new JobID();
 			JobVertexID vid = new JobVertexID();
 			JobVertex vertex = new JobVertex("vertex", vid);
 			
@@ -647,7 +641,7 @@ public class SharedSlotsTest extends TestLogger {
 			
 			Instance instance = SchedulerTestUtils.getRandomInstance(1);
 			
-			SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
+			SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
 
 			SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, constraint);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index 42cbbbf..6d572ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -34,7 +33,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class SimpleSlotTest extends TestLogger {
+public class SimpleSlotTest extends  TestLogger {
 
 	@Test
 	public void testStateTransitions() {
@@ -137,6 +136,6 @@ public class SimpleSlotTest extends TestLogger {
 			hardwareDescription,
 			1);
 
-		return instance.allocateSimpleSlot(new JobID());
+		return instance.allocateSimpleSlot();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index 8875e00..5d82f47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -23,11 +23,11 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -36,6 +36,9 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.clock.Clock;
 import org.apache.flink.runtime.util.clock.SystemClock;
@@ -158,7 +161,7 @@ public class SlotPoolRpcTest extends TestLogger {
 
 			assertEquals(1L, (long) pool.getNumberOfWaitingForResourceRequests().get());
 
-			slotPoolGateway.cancelSlotAllocation(requestId).get();
+			slotPoolGateway.cancelSlotRequest(requestId).get();
 
 			assertEquals(0L, (long) pool.getNumberOfWaitingForResourceRequests().get());
 		} finally {
@@ -202,7 +205,7 @@ public class SlotPoolRpcTest extends TestLogger {
 
 			assertEquals(1L, (long) pool.getNumberOfPendingRequests().get());
 
-			slotPoolGateway.cancelSlotAllocation(requestId).get();
+			slotPoolGateway.cancelSlotRequest(requestId).get();
 			assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
 		} finally {
 			RpcUtils.terminateRpcEndpoint(pool, timeout);
@@ -252,17 +255,22 @@ public class SlotPoolRpcTest extends TestLogger {
 			}
 
 			AllocationID allocationId = allocationIdFuture.get();
-			ResourceID resourceID = ResourceID.generate();
-			AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationId, jid, DEFAULT_TESTING_PROFILE);
-			slotPoolGateway.registerTaskManager(resourceID).get();
+			final SlotOffer slotOffer = new SlotOffer(
+				allocationId,
+				0,
+				DEFAULT_TESTING_PROFILE);
+			final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+			final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
 
-			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
 
 			assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
 
 			assertTrue(pool.containsAllocatedSlot(allocationId).get());
 
-			pool.cancelSlotAllocation(requestId).get();
+			pool.cancelSlotRequest(requestId).get();
 
 			assertFalse(pool.containsAllocatedSlot(allocationId).get());
 			assertTrue(pool.containsAvailableSlot(allocationId).get());
@@ -351,14 +359,14 @@ public class SlotPoolRpcTest extends TestLogger {
 		}
 
 		@Override
-		public CompletableFuture<Acknowledge> cancelSlotAllocation(SlotRequestID slotRequestId) {
+		public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID slotRequestId) {
 			final Consumer<SlotRequestID> currentCancelSlotAllocationConsumer = cancelSlotAllocationConsumer;
 
 			if (currentCancelSlotAllocationConsumer != null) {
 				currentCancelSlotAllocationConsumer.accept(slotRequestId);
 			}
 
-			return super.cancelSlotAllocation(slotRequestId);
+			return super.cancelSlotRequest(slotRequestId);
 		}
 
 		CompletableFuture<Boolean> containsAllocatedSlot(AllocationID allocationId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 450d377..9d90a12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.instance;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -35,6 +35,8 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -74,10 +76,17 @@ public class SlotPoolTest extends TestLogger {
 
 	private JobID jobId;
 
+	private TaskManagerLocation taskManagerLocation;
+
+	private TaskManagerGateway taskManagerGateway;
+
 	@Before
 	public void setUp() throws Exception {
 		this.rpcService = new TestingRpcService();
 		this.jobId = new JobID();
+
+		taskManagerLocation = new LocalTaskManagerLocation();
+		taskManagerGateway = new SimpleAckingTaskManagerGateway();
 	}
 
 	@After
@@ -92,8 +101,7 @@ public class SlotPoolTest extends TestLogger {
 
 		try {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
-			ResourceID resourceID = new ResourceID("resource");
-			slotPoolGateway.registerTaskManager(resourceID);
+			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
 
 			SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
 			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(requestId, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
@@ -104,14 +112,17 @@ public class SlotPoolTest extends TestLogger {
 
 			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
 
-			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			final SlotOffer slotOffer = new SlotOffer(
+				slotRequest.getAllocationId(),
+				0,
+				DEFAULT_TESTING_PROFILE);
+
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
 
 			LogicalSlot slot = future.get(1, TimeUnit.SECONDS);
 			assertTrue(future.isDone());
 			assertTrue(slot.isAlive());
-			assertEquals(resourceID, slot.getTaskManagerLocation().getResourceID());
-			assertEquals(slotPool.getAllocatedSlots().get(slot.getAllocationId()), slot);
+			assertEquals(taskManagerLocation, slot.getTaskManagerLocation());
 		} finally {
 			slotPool.shutDown();
 		}
@@ -124,8 +135,7 @@ public class SlotPoolTest extends TestLogger {
 
 		try {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
-			ResourceID resourceID = new ResourceID("resource");
-			slotPool.registerTaskManager(resourceID);
+			slotPool.registerTaskManager(taskManagerLocation.getResourceID());
 
 			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
@@ -139,8 +149,12 @@ public class SlotPoolTest extends TestLogger {
 
 			final List<SlotRequest> slotRequests = slotRequestArgumentCaptor.getAllValues();
 
-			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequests.get(0).getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			final SlotOffer slotOffer = new SlotOffer(
+				slotRequests.get(0).getAllocationId(),
+				0,
+				DEFAULT_TESTING_PROFILE);
+
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
 
 			LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
 			assertTrue(future1.isDone());
@@ -158,7 +172,7 @@ public class SlotPoolTest extends TestLogger {
 			assertTrue(slot2.isAlive());
 			assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
 			assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
-			assertEquals(slotPool.getAllocatedSlots().get(slot1.getAllocationId()), slot2);
+			assertEquals(slot1.getAllocationId(), slot2.getAllocationId());
 		} finally {
 			slotPool.shutDown();
 		}
@@ -171,8 +185,7 @@ public class SlotPoolTest extends TestLogger {
 
 		try {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
-			ResourceID resourceID = new ResourceID("resource");
-			slotPoolGateway.registerTaskManager(resourceID);
+			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
 
 			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 			assertFalse(future1.isDone());
@@ -182,8 +195,12 @@ public class SlotPoolTest extends TestLogger {
 
 			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
 
-			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			final SlotOffer slotOffer = new SlotOffer(
+				slotRequest.getAllocationId(),
+				0,
+				DEFAULT_TESTING_PROFILE);
+
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
 
 			LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
 			assertTrue(future1.isDone());
@@ -214,8 +231,7 @@ public class SlotPoolTest extends TestLogger {
 
 		try {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
-			ResourceID resourceID = new ResourceID("resource");
-			slotPoolGateway.registerTaskManager(resourceID);
+			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
 
 			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 			assertFalse(future.isDone());
@@ -225,29 +241,36 @@ public class SlotPoolTest extends TestLogger {
 
 			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
 
+			final SlotOffer slotOffer = new SlotOffer(
+				slotRequest.getAllocationId(),
+				0,
+				DEFAULT_TESTING_PROFILE);
+
+			final TaskManagerLocation invalidTaskManagerLocation = new LocalTaskManagerLocation();
+
 			// slot from unregistered resource
-			AllocatedSlot invalid = createAllocatedSlot(new ResourceID("unregistered"), slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-			assertFalse(slotPoolGateway.offerSlot(invalid).get());
+			assertFalse(slotPoolGateway.offerSlot(invalidTaskManagerLocation, taskManagerGateway, slotOffer).get());
 
-			AllocatedSlot notRequested = createAllocatedSlot(resourceID, new AllocationID(), jobId, DEFAULT_TESTING_PROFILE);
+			final SlotOffer nonRequestedSlotOffer = new SlotOffer(
+				new AllocationID(),
+				0,
+				DEFAULT_TESTING_PROFILE);
 
 			// we'll also accept non requested slots
-			assertTrue(slotPoolGateway.offerSlot(notRequested).get());
-
-			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, nonRequestedSlotOffer).get());
 
 			// accepted slot
-			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
 			LogicalSlot slot = future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			assertTrue(slot.isAlive());
 
 			// duplicated offer with using slot
-			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
 			assertTrue(slot.isAlive());
 
 			// duplicated offer with free slot
 			slot.releaseSlot();
-			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
 		} finally {
 			slotPool.shutDown();
 		}
@@ -261,8 +284,8 @@ public class SlotPoolTest extends TestLogger {
 
 		final SlotPool slotPool = new SlotPool(rpcService, jobId) {
 			@Override
-			public void returnAllocatedSlot(Slot slot) {
-				super.returnAllocatedSlot(slot);
+			public void returnAllocatedSlot(SlotContext allocatedSlot) {
+				super.returnAllocatedSlot(allocatedSlot);
 
 				slotReturnFuture.complete(true);
 			}
@@ -270,8 +293,7 @@ public class SlotPoolTest extends TestLogger {
 
 		try {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
-			ResourceID resourceID = new ResourceID("resource");
-			slotPoolGateway.registerTaskManager(resourceID);
+			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
 
 			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 
@@ -282,14 +304,18 @@ public class SlotPoolTest extends TestLogger {
 
 			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 
-			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			final SlotOffer slotOffer = new SlotOffer(
+				slotRequest.getAllocationId(),
+				0,
+				DEFAULT_TESTING_PROFILE);
+
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
 
 			LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
 			assertTrue(future1.isDone());
 			assertFalse(future2.isDone());
 
-			slotPoolGateway.releaseTaskManager(resourceID);
+			slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID());
 
 			// wait until the slot has been returned
 			slotReturnFuture.get();
@@ -378,24 +404,4 @@ public class SlotPoolTest extends TestLogger {
 
 		return slotPool.getSelfGateway(SlotPoolGateway.class);
 	}
-
-	static AllocatedSlot createAllocatedSlot(
-			final ResourceID resourceId,
-			final AllocationID allocationId,
-			final JobID jobId,
-			final ResourceProfile resourceProfile) {
-		TaskManagerLocation mockTaskManagerLocation = mock(TaskManagerLocation.class);
-		when(mockTaskManagerLocation.getResourceID()).thenReturn(resourceId);
-
-		TaskManagerGateway mockTaskManagerGateway = mock(TaskManagerGateway.class);
-
-		return new AllocatedSlot(
-			allocationId,
-			jobId,
-			mockTaskManagerLocation,
-			0,
-			resourceProfile,
-			mockTaskManagerGateway);
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
index dca47d3..28cab72 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
@@ -49,14 +48,12 @@ public class SlotSharingGroupAssignmentTest extends TestLogger {
 		final int numberSlots = 2;
 		final JobVertexID sourceId = new JobVertexID();
 		final JobVertexID sinkId = new JobVertexID();
-		final JobID jobId = new JobID();
 
 		for (int i = 0; i < numberTaskManagers; i++) {
 			final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(ResourceID.generate(), InetAddress.getLocalHost(), i + 1000);
 
 			for (int j = 0; j < numberSlots; j++) {
 				final SharedSlot slot = new SharedSlot(
-					jobId,
 					mock(SlotOwner.class),
 					taskManagerLocation,
 					j,

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
index 3f267ac..d40ff61 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
@@ -90,10 +90,10 @@ public class CoLocationConstraintTest {
 			Instance instance1 = SchedulerTestUtils.getRandomInstance(2);
 			Instance instance2 = SchedulerTestUtils.getRandomInstance(2);
 			
-			SharedSlot slot1_1 = instance1.allocateSharedSlot(jid, assignment);
-			SharedSlot slot1_2 = instance1.allocateSharedSlot(jid, assignment);
-			SharedSlot slot2_1 = instance2.allocateSharedSlot(jid, assignment);
-			SharedSlot slot2_2 = instance2.allocateSharedSlot(jid, assignment);
+			SharedSlot slot1_1 = instance1.allocateSharedSlot(assignment);
+			SharedSlot slot1_2 = instance1.allocateSharedSlot(assignment);
+			SharedSlot slot2_1 = instance2.allocateSharedSlot(assignment);
+			SharedSlot slot2_2 = instance2.allocateSharedSlot(assignment);
 			
 			// constraint is still completely unassigned
 			assertFalse(constraint.isAssigned());

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
new file mode 100644
index 0000000..6894542
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.instance.Slot;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * SlotOwner implementation used for testing purposes only.
+ */
+public class DummySlotOwner implements SlotOwner {
+	@Override
+	public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
+		return CompletableFuture.completedFuture(false);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 9c12fff..01f445b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -58,12 +58,14 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
+
 import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.net.InetAddress;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
@@ -169,7 +171,7 @@ public class TaskExecutorITCase extends TestLogger {
 		when(jmGateway.getHostname()).thenReturn(jmAddress);
 		when(jmGateway.offerSlots(
 			eq(taskManagerResourceId),
-			any(Iterable.class),
+			any(Collection.class),
 			any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
 		when(jmGateway.getFencingToken()).thenReturn(jobMasterId);
 
@@ -214,7 +216,7 @@ public class TaskExecutorITCase extends TestLogger {
 
 			verify(jmGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(
 				eq(taskManagerResourceId),
-				(Iterable<SlotOffer>)argThat(Matchers.contains(slotOffer)),
+				(Collection<SlotOffer>)argThat(Matchers.contains(slotOffer)),
 				any(Time.class));
 		} finally {
 			if (testingFatalErrorHandler.hasExceptionOccurred()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 6372792..29d07fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -852,7 +852,7 @@ public class TaskExecutorTest extends TestLogger {
 		when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
 		when(jobMasterGateway.offerSlots(
 			any(ResourceID.class),
-			any(Iterable.class),
+			any(Collection.class),
 			any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
 
 		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
@@ -904,7 +904,7 @@ public class TaskExecutorTest extends TestLogger {
 			// the job leader should get the allocation id offered
 			verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(
 					any(ResourceID.class),
-					(Iterable<SlotOffer>)Matchers.argThat(contains(slotOffer)),
+					(Collection<SlotOffer>)Matchers.argThat(contains(slotOffer)),
 					any(Time.class));
 
 			// check if a concurrent error occurred
@@ -975,7 +975,7 @@ public class TaskExecutorTest extends TestLogger {
 		when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
 
 		when(jobMasterGateway.offerSlots(
-				any(ResourceID.class), any(Iterable.class), any(Time.class)))
+				any(ResourceID.class), any(Collection.class), any(Time.class)))
 			.thenReturn(CompletableFuture.completedFuture((Collection<SlotOffer>)Collections.singleton(offer1)));
 
 		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
@@ -1315,7 +1315,7 @@ public class TaskExecutorTest extends TestLogger {
 			when(
 				jobMasterGateway.offerSlots(
 					any(ResourceID.class),
-					any(Iterable.class),
+					any(Collection.class),
 					any(Time.class)))
 				.thenReturn(offerResultFuture);
 
@@ -1323,7 +1323,7 @@ public class TaskExecutorTest extends TestLogger {
 			// been properly started. This will also offer the slots to the job master
 			jobLeaderService.addJob(jobId, jobManagerAddress);
 
-			verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(any(ResourceID.class), any(Iterable.class), any(Time.class));
+			verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(any(ResourceID.class), any(Collection.class), any(Time.class));
 
 			// submit the task without having acknowledge the offered slots
 			tmGateway.submitTask(tdd, jobMasterId, timeout);


[5/6] flink git commit: [FLINK-8089] Also check for other pending slot requests in offerSlot

Posted by tr...@apache.org.
[FLINK-8089] Also check for other pending slot requests in offerSlot

Not only check for a slot request with the right allocation id but also check
whether we can fulfill other pending slot requests with an unclaimed offered
slot before adding it to the list of available slots.

This closes #5090.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/401d0065
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/401d0065
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/401d0065

Branch: refs/heads/master
Commit: 401d006516caa2a9d8289e760ccd3a9c564bc795
Parents: bc1c375
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 13 15:42:07 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 14 14:32:25 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/instance/AllocatedSlot.java   | 18 ++---
 .../apache/flink/runtime/instance/SlotPool.java | 41 +++++------
 .../flink/runtime/instance/SlotPoolTest.java    | 73 ++++++++++++++++++++
 3 files changed, 103 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/401d0065/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
index a3f98f1..97be592 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
@@ -108,15 +108,6 @@ public class AllocatedSlot {
 	}
 
 	/**
-	 * Gets the number of the slot.
-	 *
-	 * @return The number of the slot on the TaskManager.
-	 */
-	public int getPhysicalSlotNumber() {
-		return physicalSlotNumber;
-	}
-
-	/**
 	 * Gets the resource profile of the slot.
 	 *
 	 * @return The resource profile of the slot.
@@ -146,6 +137,15 @@ public class AllocatedSlot {
 	}
 
 	/**
+	 * Returns true if this slot is not being used (e.g. a logical slot is allocated from this slot).
+	 *
+	 * @return true if a logical slot is allocated from this slot, otherwise false
+	 */
+	public boolean isUsed() {
+		return logicalSlotReference.get() != null;
+	}
+
+	/**
 	 * Triggers the release of the logical slot.
 	 */
 	public void triggerLogicalSlotRelease() {

http://git-wip-us.apache.org/repos/asf/flink/blob/401d0065/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index a72f57b..68f5be6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -281,7 +281,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		final AllocatedSlot allocatedSlot = allocatedSlots.remove(slotRequestId);
 
 		if (allocatedSlot != null) {
-			internalReturnAllocatedSlot(allocatedSlot);
+			if (allocatedSlot.releaseLogicalSlot()) {
+				tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
+			} else {
+				throw new RuntimeException("Could not release allocated slot " + allocatedSlot + '.');
+			}
 		} else {
 			log.debug("There is no allocated slot with request id {}. Ignoring this request.", slotRequestId);
 		}
@@ -342,9 +346,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 				try {
 					return allocatedSlot.allocateSimpleSlot(requestId, Locality.UNKNOWN);
 				} catch (SlotException e) {
-					internalReturnAllocatedSlot(allocatedSlot);
-
-					throw new CompletionException("Could not allocate a logical simple slot.", e);
+					throw new CompletionException("Could not allocate a logical simple slot from allocate slot " +
+						allocatedSlot + '.', e);
 				}
 			});
 	}
@@ -464,6 +467,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		Preconditions.checkNotNull(e);
 
 		if (!pendingRequest.getAllocatedSlotFuture().isDone()) {
+			LOG.info("Failing pending request {}.", pendingRequest.getSlotRequestId());
 			pendingRequest.getAllocatedSlotFuture().completeExceptionally(e);
 		}
 	}
@@ -497,28 +501,25 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Return the slot back to this pool without releasing it. It's mainly called by failed / cancelled tasks, and the
-	 * slot can be reused by other pending requests if the resource profile matches.n
+	 * Tries to fulfill with the given allocated slot a pending slot request or add the
+	 * allocated slot to the set of available slots if no matching request is available.
 	 *
 	 * @param allocatedSlot which shall be returned
 	 */
-	private void internalReturnAllocatedSlot(AllocatedSlot allocatedSlot) {
-		if (allocatedSlot.releaseLogicalSlot()) {
+	private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) {
+		Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use.");
 
-			final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
+		final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
 
-			if (pendingRequest != null) {
-				LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
-					pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
+		if (pendingRequest != null) {
+			LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
+				pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
 
-				allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
-				pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
-			} else {
-				LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
-				availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
-			}
+			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
+			pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
 		} else {
-			LOG.debug("Failed to mark the logical slot of {} as released.", allocatedSlot);
+			LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
+			availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
 		}
 	}
 
@@ -643,7 +644,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 			// we were actually not waiting for this:
 			//   - could be that this request had been fulfilled
 			//   - we are receiving the slots from TaskManagers after becoming leaders
-			availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
+			tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
 		}
 
 		// we accepted the request in any case. slot will be released after it idled for

http://git-wip-us.apache.org/repos/asf/flink/blob/401d0065/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index ec20f6b..1af9cce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
@@ -50,7 +51,9 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
@@ -383,6 +386,76 @@ public class SlotPoolTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that unused offered slots are directly used to fulfill pending slot
+	 * requests.
+	 *
+	 * <p>See FLINK-8089
+	 */
+	@Test
+	public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception {
+		final SlotPool slotPool = new SlotPool(rpcService, jobId);
+
+		final JobMasterId jobMasterId = JobMasterId.generate();
+		final String jobMasterAddress = "foobar";
+		final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
+		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+
+		resourceManagerGateway.setRequestSlotConsumer(
+			(SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+		final SlotRequestID slotRequestId1 = new SlotRequestID();
+		final SlotRequestID slotRequestId2 = new SlotRequestID();
+
+		try {
+			slotPool.start(jobMasterId, jobMasterAddress);
+
+			final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);
+
+			final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class));
+
+			slotPoolGateway.connectToResourceManager(resourceManagerGateway);
+
+			CompletableFuture<LogicalSlot> slotFuture1 = slotPoolGateway.allocateSlot(
+				slotRequestId1,
+				scheduledUnit,
+				ResourceProfile.UNKNOWN,
+				Collections.emptyList(),
+				timeout);
+
+			// wait for the first slot request
+			final AllocationID allocationId = allocationIdFuture.get();
+
+			CompletableFuture<LogicalSlot> slotFuture2 = slotPoolGateway.allocateSlot(
+				slotRequestId2,
+				scheduledUnit,
+				ResourceProfile.UNKNOWN,
+				Collections.emptyList(),
+				timeout);
+
+			slotPoolGateway.cancelSlotRequest(slotRequestId1);
+
+			try {
+				// this should fail with a CancellationException
+				slotFuture1.get();
+				fail("The first slot future should have failed because it was cancelled.");
+			} catch (ExecutionException ee) {
+				assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof CancellationException);
+			}
+
+			final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
+
+			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
+
+			// the slot offer should fulfill the second slot request
+			assertEquals(allocationId, slotFuture2.get().getAllocationId());
+		} finally {
+			RpcUtils.terminateRpcEndpoint(slotPool, timeout);
+		}
+	}
+
 	private static ResourceManagerGateway createResourceManagerGatewayMock() {
 		ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
 		when(resourceManagerGateway


[3/6] flink git commit: [FLINK-8087] Decouple Slot from AllocatedSlot

Posted by tr...@apache.org.
[FLINK-8087] Decouple Slot from AllocatedSlot

This commit introduces the SlotContext which is an abstraction for the SimpleSlot
to obtain the relevant slot information to do the communication with the
TaskManager without relying on the AllocatedSlot which is now only used by the
SlotPool.

This closes #5088.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a569f38f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a569f38f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a569f38f

Branch: refs/heads/master
Commit: a569f38f16186518b53461842d37b09fb1df45e9
Parents: 627bcda
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Nov 24 18:03:49 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 14 14:32:24 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/instance/AllocatedSlot.java   | 241 ++++++++++++++
 .../apache/flink/runtime/instance/Instance.java |  27 +-
 .../flink/runtime/instance/SharedSlot.java      | 106 +++---
 .../flink/runtime/instance/SimpleSlot.java      |  60 ++--
 .../org/apache/flink/runtime/instance/Slot.java |  51 ++-
 .../apache/flink/runtime/instance/SlotPool.java | 323 ++++++++++---------
 .../flink/runtime/instance/SlotPoolGateway.java |  18 +-
 .../runtime/jobmanager/scheduler/Scheduler.java |   6 +-
 .../runtime/jobmanager/slots/AllocatedSlot.java | 169 ----------
 .../jobmanager/slots/SimpleSlotContext.java     |  68 ++++
 .../jobmanager/slots/SlotAndLocality.java       |   1 +
 .../runtime/jobmanager/slots/SlotContext.java   |  61 ++++
 .../runtime/jobmanager/slots/SlotException.java |  40 +++
 .../flink/runtime/jobmaster/JobMaster.java      |  24 +-
 .../runtime/jobmaster/JobMasterGateway.java     |   2 +-
 .../ExecutionGraphDeploymentTest.java           |  13 +-
 .../ExecutionGraphSchedulingTest.java           |  14 +-
 .../executiongraph/ExecutionGraphStopTest.java  |  10 +-
 .../executiongraph/ExecutionGraphTestUtils.java |  14 +-
 .../runtime/executiongraph/ExecutionTest.java   |   8 +-
 .../ExecutionVertexCancelTest.java              |  44 +--
 .../ExecutionVertexDeploymentTest.java          |  18 +-
 .../ExecutionVertexLocalityTest.java            |   8 +-
 .../ExecutionVertexSchedulingTest.java          |   6 +-
 .../utils/SimpleSlotProvider.java               |  14 +-
 .../runtime/instance/AllocatedSlotsTest.java    |  75 +++--
 .../runtime/instance/AvailableSlotsTest.java    |  33 +-
 .../flink/runtime/instance/InstanceTest.java    |  39 ++-
 .../flink/runtime/instance/SharedSlotsTest.java |  34 +-
 .../flink/runtime/instance/SimpleSlotTest.java  |   5 +-
 .../flink/runtime/instance/SlotPoolRpcTest.java |  30 +-
 .../flink/runtime/instance/SlotPoolTest.java    | 116 +++----
 .../SlotSharingGroupAssignmentTest.java         |   3 -
 .../scheduler/CoLocationConstraintTest.java     |   8 +-
 .../jobmanager/slots/DummySlotOwner.java        |  33 ++
 .../taskexecutor/TaskExecutorITCase.java        |   6 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |  10 +-
 37 files changed, 1031 insertions(+), 707 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
new file mode 100644
index 0000000..7036044
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
+import org.apache.flink.runtime.jobmanager.slots.SlotException;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@code AllocatedSlot} represents a slot that the JobManager allocated from a TaskManager.
+ * It represents a slice of allocated resources from the TaskManager.
+ * 
+ * <p>To allocate an {@code AllocatedSlot}, the requests a slot from the ResourceManager. The
+ * ResourceManager picks (or starts) a TaskManager that will then allocate the slot to the
+ * JobManager and notify the JobManager.
+ * 
+ * <p>Note: Prior to the resource management changes introduced in (Flink Improvement Proposal 6),
+ * an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the
+ * JobManager. All slots had a default unknown resource profile. 
+ */
+public class AllocatedSlot implements SlotContext {
+
+	/** The ID under which the slot is allocated. Uniquely identifies the slot. */
+	private final AllocationID slotAllocationId;
+
+	/** The location information of the TaskManager to which this slot belongs */
+	private final TaskManagerLocation taskManagerLocation;
+
+	/** The resource profile of the slot provides */
+	private final ResourceProfile resourceProfile;
+
+	/** RPC gateway to call the TaskManager that holds this slot */
+	private final TaskManagerGateway taskManagerGateway;
+
+	/** The number of the slot on the TaskManager to which slot belongs. Purely informational. */
+	private final int physicalSlotNumber;
+
+	private final SlotOwner slotOwner;
+
+	private final AtomicReference<LogicalSlot> logicalSlotReference;
+
+	// ------------------------------------------------------------------------
+
+	public AllocatedSlot(
+			AllocationID slotAllocationId,
+			TaskManagerLocation location,
+			int physicalSlotNumber,
+			ResourceProfile resourceProfile,
+			TaskManagerGateway taskManagerGateway,
+			SlotOwner slotOwner) {
+		this.slotAllocationId = checkNotNull(slotAllocationId);
+		this.taskManagerLocation = checkNotNull(location);
+		this.physicalSlotNumber = physicalSlotNumber;
+		this.resourceProfile = checkNotNull(resourceProfile);
+		this.taskManagerGateway = checkNotNull(taskManagerGateway);
+		this.slotOwner = checkNotNull(slotOwner);
+
+		logicalSlotReference = new AtomicReference<>(null);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the ID under which the slot is allocated, which uniquely identifies the slot.
+	 * 
+	 * @return The ID under which the slot is allocated
+	 */
+	public AllocationID getAllocationId() {
+		return slotAllocationId;
+	}
+
+	/**
+	 * Gets the ID of the TaskManager on which this slot was allocated.
+	 * 
+	 * <p>This is equivalent to {@link #getTaskManagerLocation()#getTaskManagerId()}.
+	 * 
+	 * @return This slot's TaskManager's ID.
+	 */
+	public ResourceID getTaskManagerId() {
+		return getTaskManagerLocation().getResourceID();
+	}
+
+	/**
+	 * Gets the number of the slot.
+	 *
+	 * @return The number of the slot on the TaskManager.
+	 */
+	public int getPhysicalSlotNumber() {
+		return physicalSlotNumber;
+	}
+
+	/**
+	 * Gets the resource profile of the slot.
+	 *
+	 * @return The resource profile of the slot.
+	 */
+	public ResourceProfile getResourceProfile() {
+		return resourceProfile;
+	}
+
+	/**
+	 * Gets the location info of the TaskManager that offers this slot.
+	 *
+	 * @return The location info of the TaskManager that offers this slot
+	 */
+	public TaskManagerLocation getTaskManagerLocation() {
+		return taskManagerLocation;
+	}
+
+	/**
+	 * Gets the actor gateway that can be used to send messages to the TaskManager.
+	 * <p>
+	 * This method should be removed once the new interface-based RPC abstraction is in place
+	 *
+	 * @return The actor gateway that can be used to send messages to the TaskManager.
+	 */
+	public TaskManagerGateway getTaskManagerGateway() {
+		return taskManagerGateway;
+	}
+
+	/**
+	 * Triggers the release of the logical slot.
+	 */
+	public void triggerLogicalSlotRelease() {
+		final LogicalSlot logicalSlot = logicalSlotReference.get();
+
+		if (logicalSlot != null) {
+			logicalSlot.releaseSlot();
+		}
+	}
+
+	/**
+	 * Releases the logical slot.
+	 *
+	 * @return true if the logical slot could be released, false otherwise.
+	 */
+	public boolean releaseLogicalSlot() {
+		final LogicalSlot logicalSlot = logicalSlotReference.get();
+
+		if (logicalSlot != null) {
+			if (logicalSlot instanceof Slot) {
+				final Slot slot = (Slot) logicalSlot;
+				if (slot.markReleased()) {
+					logicalSlotReference.set(null);
+					return true;
+				}
+			} else {
+				throw new RuntimeException("Unsupported logical slot type encountered " + logicalSlot.getClass());
+			}
+
+		}
+
+		return false;
+	}
+
+	/**
+	 * Allocates a logical {@link SimpleSlot}.
+	 *
+	 * @return an allocated logical simple slot
+	 * @throws SlotException if we could not allocate a simple slot
+	 */
+	public SimpleSlot allocateSimpleSlot(Locality locality) throws SlotException {
+
+		final SimpleSlot simpleSlot = new SimpleSlot(this, slotOwner, physicalSlotNumber);
+
+		if (logicalSlotReference.compareAndSet(null, simpleSlot)) {
+			simpleSlot.setLocality(locality);
+			return simpleSlot;
+		} else {
+			throw new SlotException("Could not allocate logical simple slot because the allocated slot is already used.");
+		}
+	}
+
+	/**
+	 * Allocates a logical {@link SharedSlot}.
+	 *
+	 * @param slotSharingGroupAssignment the slot sharing group to which the shared slot shall belong
+	 * @return an allocated logical shared slot
+	 * @throws SlotException if we could not allocate a shared slot
+	 */
+	public SharedSlot allocateSharedSlot(SlotSharingGroupAssignment slotSharingGroupAssignment) throws SlotException {
+		final SharedSlot sharedSlot = new SharedSlot(this, slotOwner, slotSharingGroupAssignment);
+
+		if (logicalSlotReference.compareAndSet(null, sharedSlot)) {
+
+
+			return sharedSlot;
+		} else {
+			throw new SlotException("Could not allocate logical shared slot because the allocated slot is already used.");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This always returns a reference hash code.
+	 */
+	@Override
+	public final int hashCode() {
+		return super.hashCode();
+	}
+
+	/**
+	 * This always checks based on reference equality.
+	 */
+	@Override
+	public final boolean equals(Object obj) {
+		return this == obj;
+	}
+
+	@Override
+	public String toString() {
+		return "AllocatedSlot " + slotAllocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index d099f6a..54c8971 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
@@ -210,19 +209,13 @@ public class Instance implements SlotOwner {
 	 * Allocates a simple slot on this TaskManager instance. This method returns {@code null}, if no slot
 	 * is available at the moment.
 	 *
-	 * @param jobID The ID of the job that the slot is allocated for.
-	 *
 	 * @return A simple slot that represents a task slot on this TaskManager instance, or null, if the
 	 *         TaskManager instance has no more slots available.
 	 *
 	 * @throws InstanceDiedException Thrown if the instance is no longer alive by the time the
 	 *                               slot is allocated. 
 	 */
-	public SimpleSlot allocateSimpleSlot(JobID jobID) throws InstanceDiedException {
-		if (jobID == null) {
-			throw new IllegalArgumentException();
-		}
-
+	public SimpleSlot allocateSimpleSlot() throws InstanceDiedException {
 		synchronized (instanceLock) {
 			if (isDead) {
 				throw new InstanceDiedException(this);
@@ -233,7 +226,7 @@ public class Instance implements SlotOwner {
 				return null;
 			}
 			else {
-				SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, taskManagerGateway);
+				SimpleSlot slot = new SimpleSlot(this, location, nextSlot, taskManagerGateway);
 				allocatedSlots.add(slot);
 				return slot;
 			}
@@ -244,7 +237,6 @@ public class Instance implements SlotOwner {
 	 * Allocates a shared slot on this TaskManager instance. This method returns {@code null}, if no slot
 	 * is available at the moment. The shared slot will be managed by the given  SlotSharingGroupAssignment.
 	 *
-	 * @param jobID The ID of the job that the slot is allocated for.
 	 * @param sharingGroupAssignment The assignment group that manages this shared slot.
 	 *
 	 * @return A shared slot that represents a task slot on this TaskManager instance and can hold other
@@ -252,13 +244,8 @@ public class Instance implements SlotOwner {
 	 *
 	 * @throws InstanceDiedException Thrown if the instance is no longer alive by the time the slot is allocated. 
 	 */
-	public SharedSlot allocateSharedSlot(JobID jobID, SlotSharingGroupAssignment sharingGroupAssignment)
-			throws InstanceDiedException
-	{
-		// the slot needs to be in the returned to taskManager state
-		if (jobID == null) {
-			throw new IllegalArgumentException();
-		}
+	public SharedSlot allocateSharedSlot(SlotSharingGroupAssignment sharingGroupAssignment)
+			throws InstanceDiedException {
 
 		synchronized (instanceLock) {
 			if (isDead) {
@@ -271,7 +258,11 @@ public class Instance implements SlotOwner {
 			}
 			else {
 				SharedSlot slot = new SharedSlot(
-						jobID, this, location, nextSlot, taskManagerGateway, sharingGroupAssignment);
+					this,
+					location,
+					nextSlot,
+					taskManagerGateway,
+					sharingGroupAssignment);
 				allocatedSlots.add(slot);
 				return slot;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
index 2ce4fc3..8637159 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
@@ -18,17 +18,20 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nullable;
+
 import java.util.ConcurrentModificationException;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -44,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * passed through a {@link SlotSharingGroupAssignment} object which is responsible for
  * synchronization.
  */
-public class SharedSlot extends Slot {
+public class SharedSlot extends Slot implements LogicalSlot {
 
 	/** The assignment group os shared slots that manages the availability and release of the slots */
 	private final SlotSharingGroupAssignment assignmentGroup;
@@ -52,6 +55,8 @@ public class SharedSlot extends Slot {
 	/** The set os sub-slots allocated from this shared slot */
 	private final Set<Slot> subSlots;
 
+	private final CompletableFuture<?> cancellationFuture = new CompletableFuture<>();
+
 	// ------------------------------------------------------------------------
 	//  Old Constructors (prior FLIP-6)
 	// ------------------------------------------------------------------------
@@ -60,7 +65,6 @@ public class SharedSlot extends Slot {
 	 * Creates a new shared slot that has no parent (is a root slot) and does not belong to any task group.
 	 * This constructor is used to create a slot directly from an instance. 
 	 *
-	 * @param jobID The ID of the job that the slot is created for.
 	 * @param owner The component from which this slot is allocated.
 	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of the slot.
@@ -68,18 +72,17 @@ public class SharedSlot extends Slot {
 	 * @param assignmentGroup The assignment group that this shared slot belongs to.
 	 */
 	public SharedSlot(
-			JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
+			SlotOwner owner, TaskManagerLocation location, int slotNumber,
 			TaskManagerGateway taskManagerGateway,
 			SlotSharingGroupAssignment assignmentGroup) {
 
-		this(jobID, owner, location, slotNumber, taskManagerGateway, assignmentGroup, null, null);
+		this(owner, location, slotNumber, taskManagerGateway, assignmentGroup, null, null);
 	}
 
 	/**
 	 * Creates a new shared slot that has is a sub-slot of the given parent shared slot, and that belongs
 	 * to the given task group.
 	 *
-	 * @param jobID The ID of the job that the slot is created for.
 	 * @param owner The component from which this slot is allocated.
 	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of the slot.
@@ -89,7 +92,6 @@ public class SharedSlot extends Slot {
 	 * @param groupId The assignment group of this slot.
 	 */
 	public SharedSlot(
-			JobID jobID,
 			SlotOwner owner,
 			TaskManagerLocation location,
 			int slotNumber,
@@ -98,7 +100,7 @@ public class SharedSlot extends Slot {
 			@Nullable SharedSlot parent,
 			@Nullable AbstractID groupId) {
 
-		super(jobID, owner, location, slotNumber, taskManagerGateway, parent, groupId);
+		super(owner, location, slotNumber, taskManagerGateway, parent, groupId);
 
 		this.assignmentGroup = checkNotNull(assignmentGroup);
 		this.subSlots = new HashSet<Slot>();
@@ -112,38 +114,23 @@ public class SharedSlot extends Slot {
 	 * Creates a new shared slot that has no parent (is a root slot) and does not belong to any task group.
 	 * This constructor is used to create a slot directly from an instance.
 	 * 
-	 * @param allocatedSlot The allocated slot that this slot represents.
+	 * @param slotContext The slot context of this shared slot
 	 * @param owner The component from which this slot is allocated.
 	 * @param assignmentGroup The assignment group that this shared slot belongs to.
 	 */
-	public SharedSlot(AllocatedSlot allocatedSlot, SlotOwner owner, SlotSharingGroupAssignment assignmentGroup) {
-		this(allocatedSlot, owner, allocatedSlot.getSlotNumber(), assignmentGroup, null, null);
-	}
-
-	/**
-	 * Creates a new shared slot that is a sub-slot of the given parent shared slot, and that belongs
-	 * to the given task group.
-	 * 
-	 * @param parent The parent slot of this slot.
-	 * @param owner The component from which this slot is allocated.
-	 * @param slotNumber The number of the slot.
-	 * @param assignmentGroup The assignment group that this shared slot belongs to.
-	 * @param groupId The assignment group of this slot.
-	 */
-	public SharedSlot(
-			SharedSlot parent, SlotOwner owner, int slotNumber,
-			SlotSharingGroupAssignment assignmentGroup,
-			AbstractID groupId) {
-
-		this(parent.getAllocatedSlot(), owner, slotNumber, assignmentGroup, parent, groupId);
+	public SharedSlot(SlotContext slotContext, SlotOwner owner, SlotSharingGroupAssignment assignmentGroup) {
+		this(slotContext, owner, slotContext.getPhysicalSlotNumber(), assignmentGroup, null, null);
 	}
 
 	private SharedSlot(
-			AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber,
+			SlotContext slotInformation,
+			SlotOwner owner,
+			int slotNumber,
 			SlotSharingGroupAssignment assignmentGroup,
-			@Nullable SharedSlot parent, @Nullable AbstractID groupId) {
+			@Nullable SharedSlot parent,
+			@Nullable AbstractID groupId) {
 
-		super(allocatedSlot, owner, slotNumber, parent, groupId);
+		super(slotInformation, owner, slotNumber, parent, groupId);
 
 		this.assignmentGroup = checkNotNull(assignmentGroup);
 		this.subSlots = new HashSet<Slot>();
@@ -186,14 +173,44 @@ public class SharedSlot extends Slot {
 	public boolean hasChildren() {
 		return subSlots.size() > 0;
 	}
-	
+
 	@Override
-	public void releaseInstanceSlot() {
+	public boolean tryAssignPayload(Payload payload) {
+		throw new UnsupportedOperationException("Cannot assign an execution attempt id to a shared slot.");
+	}
+
+	@Nullable
+	@Override
+	public Payload getPayload() {
+		return null;
+	}
+
+	@Override
+	public CompletableFuture<?> releaseSlot() {
+		cancellationFuture.completeExceptionally(new FlinkException("Shared slot " + this + " is being released."));
+
 		assignmentGroup.releaseSharedSlot(this);
-		
+
 		if (!(isReleased() && subSlots.isEmpty())) {
 			throw new IllegalStateException("Bug: SharedSlot is not empty and released after call to releaseSlot()");
 		}
+
+		return CompletableFuture.completedFuture(null);
+	}
+
+	@Override
+	public void releaseInstanceSlot() {
+		releaseSlot();
+	}
+
+	@Override
+	public int getPhysicalSlotNumber() {
+		return getRootSlotNumber();
+	}
+
+	@Override
+	public AllocationID getAllocationId() {
+		return getSlotContext().getAllocationId();
 	}
 
 	/**
@@ -222,8 +239,12 @@ public class SharedSlot extends Slot {
 	SimpleSlot allocateSubSlot(AbstractID groupId) {
 		if (isAlive()) {
 			SimpleSlot slot = new SimpleSlot(
-					getJobID(), getOwner(), getTaskManagerLocation(), subSlots.size(), 
-					getTaskManagerGateway(), this, groupId);
+				getOwner(),
+				getTaskManagerLocation(),
+				subSlots.size(),
+				getTaskManagerGateway(),
+				this,
+				groupId);
 			subSlots.add(slot);
 			return slot;
 		}
@@ -244,8 +265,13 @@ public class SharedSlot extends Slot {
 	SharedSlot allocateSharedSlot(AbstractID groupId){
 		if (isAlive()) {
 			SharedSlot slot = new SharedSlot(
-					getJobID(), getOwner(), getTaskManagerLocation(), subSlots.size(), 
-					getTaskManagerGateway(), assignmentGroup, this, groupId);
+				getOwner(),
+				getTaskManagerLocation(),
+				subSlots.size(),
+				getTaskManagerGateway(),
+				assignmentGroup,
+				this,
+				groupId);
 			subSlots.add(slot);
 			return slot;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index 0c9e11c..d397c08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -64,23 +63,21 @@ public class SimpleSlot extends Slot implements LogicalSlot {
 	/**
 	 * Creates a new simple slot that stands alone and does not belong to shared slot.
 	 * 
-	 * @param jobID The ID of the job that the slot is allocated for.
 	 * @param owner The component from which this slot is allocated.
 	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of the task slot on the instance.
 	 * @param taskManagerGateway The gateway to communicate with the TaskManager of this slot
 	 */
 	public SimpleSlot(
-			JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
+			SlotOwner owner, TaskManagerLocation location, int slotNumber,
 			TaskManagerGateway taskManagerGateway) {
-		this(jobID, owner, location, slotNumber, taskManagerGateway, null, null);
+		this(owner, location, slotNumber, taskManagerGateway, null, null);
 	}
 
 	/**
 	 * Creates a new simple slot that belongs to the given shared slot and
 	 * is identified by the given ID.
 	 *
-	 * @param jobID The ID of the job that the slot is allocated for.
 	 * @param owner The component from which this slot is allocated.
 	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of the simple slot in its parent shared slot.
@@ -89,15 +86,25 @@ public class SimpleSlot extends Slot implements LogicalSlot {
 	 * @param groupID The ID that identifies the group that the slot belongs to.
 	 */
 	public SimpleSlot(
-			JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
+			SlotOwner owner,
+			TaskManagerLocation location,
+			int slotNumber,
 			TaskManagerGateway taskManagerGateway,
-			@Nullable SharedSlot parent, @Nullable AbstractID groupID) {
-
-		super(parent != null ?
-				parent.getAllocatedSlot() :
-				new AllocatedSlot(NO_ALLOCATION_ID, jobID, location, slotNumber,
-						ResourceProfile.UNKNOWN, taskManagerGateway),
-				owner, slotNumber, parent, groupID);
+			@Nullable SharedSlot parent,
+			@Nullable AbstractID groupID) {
+
+		super(
+			parent != null ?
+				parent.getSlotContext() :
+				new SimpleSlotContext(
+					NO_ALLOCATION_ID,
+					location,
+					slotNumber,
+					taskManagerGateway),
+			owner,
+			slotNumber,
+			parent,
+			groupID);
 	}
 
 	// ------------------------------------------------------------------------
@@ -107,12 +114,11 @@ public class SimpleSlot extends Slot implements LogicalSlot {
 	/**
 	 * Creates a new simple slot that stands alone and does not belong to shared slot.
 	 *
-	 * @param allocatedSlot The allocated slot that this slot represents.
+	 * @param slotContext The slot context of this simple slot
 	 * @param owner The component from which this slot is allocated.
-	 * @param slotNumber The number of the task slot on the instance.
 	 */
-	public SimpleSlot(AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber) {
-		this(allocatedSlot, owner, slotNumber, null, null);
+	public SimpleSlot(SlotContext slotContext, SlotOwner owner, int slotNumber) {
+		this(slotContext, owner, slotNumber, null, null);
 	}
 
 	/**
@@ -121,27 +127,29 @@ public class SimpleSlot extends Slot implements LogicalSlot {
 	 *
 	 * @param parent The parent shared slot.
 	 * @param owner The component from which this slot is allocated.
-	 * @param slotNumber The number of the simple slot in its parent shared slot.
 	 * @param groupID The ID that identifies the group that the slot belongs to.
 	 */
 	public SimpleSlot(SharedSlot parent, SlotOwner owner, int slotNumber, AbstractID groupID) {
-		this(parent.getAllocatedSlot(), owner, slotNumber, parent, groupID);
+		this(parent.getSlotContext(), owner, slotNumber, parent, groupID);
 	}
 	
 	/**
 	 * Creates a new simple slot that belongs to the given shared slot and
 	 * is identified by the given ID..
 	 *
-	 * @param allocatedSlot The allocated slot that this slot represents.
+	 * @param slotContext The slot context of this simple slot
 	 * @param owner The component from which this slot is allocated.
 	 * @param slotNumber The number of the simple slot in its parent shared slot.
 	 * @param parent The parent shared slot.
 	 * @param groupID The ID that identifies the group that the slot belongs to.
 	 */
 	private SimpleSlot(
-			AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber,
-			@Nullable SharedSlot parent, @Nullable AbstractID groupID) {
-		super(allocatedSlot, owner, slotNumber, parent, groupID);
+			SlotContext slotContext,
+			SlotOwner owner,
+			int slotNumber,
+			@Nullable SharedSlot parent,
+			@Nullable AbstractID groupID) {
+		super(slotContext, owner, slotNumber, parent, groupID);
 	}
 
 	// ------------------------------------------------------------------------
@@ -263,7 +271,7 @@ public class SimpleSlot extends Slot implements LogicalSlot {
 
 	@Override
 	public AllocationID getAllocationId() {
-		return getAllocatedSlot().getSlotAllocationId();
+		return getSlotContext().getAllocationId();
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index 804682b..6262c9a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -66,8 +65,8 @@ public abstract class Slot {
 
 	// ------------------------------------------------------------------------
 
-	/** The allocated slot that this slot represents. */
-	private final AllocatedSlot allocatedSlot;
+	/** Context of this logical slot. */
+	private final SlotContext slotContext;
 
 	/** The owner of this slot - the slot was taken from that owner and must be disposed to it */
 	private final SlotOwner owner;
@@ -80,7 +79,6 @@ public abstract class Slot {
 	@Nullable
 	private final AbstractID groupID;
 
-	/** The number of the slot on which the task is deployed */
 	private final int slotNumber;
 
 	/** The state of the vertex, only atomically updated */
@@ -93,7 +91,6 @@ public abstract class Slot {
 	 * 
 	 * <p>This is the old way of constructing slots, prior to the FLIP-6 resource management refactoring.
 	 * 
-	 * @param jobID The ID of the job that this slot is allocated for.
 	 * @param owner The component from which this slot is allocated.
 	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of this slot.
@@ -103,7 +100,6 @@ public abstract class Slot {
 	 *                if the slot does not belong to any task group.   
 	 */
 	protected Slot(
-			JobID jobID,
 			SlotOwner owner,
 			TaskManagerLocation location,
 			int slotNumber,
@@ -113,12 +109,11 @@ public abstract class Slot {
 
 		checkArgument(slotNumber >= 0);
 
-		this.allocatedSlot = new AllocatedSlot(
+		// create a simple slot context
+		this.slotContext = new SimpleSlotContext(
 			NO_ALLOCATION_ID,
-			jobID,
 			location,
 			slotNumber,
-			ResourceProfile.UNKNOWN,
 			taskManagerGateway);
 
 		this.owner = checkNotNull(owner);
@@ -130,7 +125,7 @@ public abstract class Slot {
 	/**
 	 * Base constructor for slots.
 	 *
-	 * @param allocatedSlot The allocated slot that this slot represents.
+	 * @param slotContext The slot context of this slot.
 	 * @param owner The component from which this slot is allocated.
 	 * @param slotNumber The number of this slot.
 	 * @param parent The parent slot that contains this slot. May be null, if this slot is the root.
@@ -138,12 +133,13 @@ public abstract class Slot {
 	 *                if the slot does not belong to any task group.   
 	 */
 	protected Slot(
-			AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber,
-			@Nullable SharedSlot parent, @Nullable AbstractID groupID) {
-
-		checkArgument(slotNumber >= 0);
+			SlotContext slotContext,
+			SlotOwner owner,
+			int slotNumber,
+			@Nullable SharedSlot parent,
+			@Nullable AbstractID groupID) {
 
-		this.allocatedSlot = checkNotNull(allocatedSlot);
+		this.slotContext = checkNotNull(slotContext);
 		this.owner = checkNotNull(owner);
 		this.parent = parent; // may be null
 		this.groupID = groupID; // may be null
@@ -157,17 +153,8 @@ public abstract class Slot {
 	 * 
 	 * @return This slot's allocated slot.
 	 */
-	public AllocatedSlot getAllocatedSlot() {
-		return allocatedSlot;
-	}
-
-	/**
-	 * Returns the ID of the job this allocated slot belongs to.
-	 *
-	 * @return the ID of the job this allocated slot belongs to
-	 */
-	public JobID getJobID() {
-		return allocatedSlot.getJobID();
+	public SlotContext getSlotContext() {
+		return slotContext;
 	}
 
 	/**
@@ -176,7 +163,7 @@ public abstract class Slot {
 	 * @return The ID of the TaskManager that offers this slot
 	 */
 	public ResourceID getTaskManagerID() {
-		return allocatedSlot.getTaskManagerLocation().getResourceID();
+		return slotContext.getTaskManagerLocation().getResourceID();
 	}
 
 	/**
@@ -185,7 +172,7 @@ public abstract class Slot {
 	 * @return The location info of the TaskManager that offers this slot
 	 */
 	public TaskManagerLocation getTaskManagerLocation() {
-		return allocatedSlot.getTaskManagerLocation();
+		return slotContext.getTaskManagerLocation();
 	}
 
 	/**
@@ -196,7 +183,7 @@ public abstract class Slot {
 	 * @return The actor gateway that can be used to send messages to the TaskManager.
 	 */
 	public TaskManagerGateway getTaskManagerGateway() {
-		return allocatedSlot.getTaskManagerGateway();
+		return slotContext.getTaskManagerGateway();
 	}
 
 	/**
@@ -373,7 +360,7 @@ public abstract class Slot {
 	}
 
 	protected String hierarchy() {
-		return (getParent() != null ? getParent().hierarchy() : "") + '(' + slotNumber + ')';
+		return (getParent() != null ? getParent().hierarchy() : "") + '(' + getSlotNumber() + ')';
 	}
 
 	private static String getStateName(int state) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 771d690..2ccea75 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.instance;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -29,9 +28,11 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
+import org.apache.flink.runtime.jobmanager.slots.SlotException;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -59,11 +60,11 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -249,7 +250,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 
 		// work on all slots waiting for this connection
 		for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
-			requestSlotFromResourceManager(pendingRequest);
+			requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
 		}
 
 		// all sent off
@@ -277,24 +278,23 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	}
 
 	@Override
-	public void returnAllocatedSlot(Slot slot) {
-		internalReturnAllocatedSlot(slot);
+	public void returnAllocatedSlot(SlotContext allocatedSlot) {
+		internalReturnAllocatedSlot(allocatedSlot.getAllocationId());
 	}
 
 	@Override
-	public CompletableFuture<Acknowledge> cancelSlotAllocation(SlotRequestID requestId) {
+	public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID requestId) {
 		final PendingRequest pendingRequest = removePendingRequest(requestId);
 
 		if (pendingRequest != null) {
 			failPendingRequest(pendingRequest, new CancellationException("Allocation with request id" + requestId + " cancelled."));
 		} else {
-			final Slot slot = allocatedSlots.get(requestId);
+			final AllocatedSlot allocatedSlot = allocatedSlots.get(requestId);
 
-			if (slot != null) {
-				LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", slot, requestId);
-				if (slot.markCancelled()) {
-					internalReturnAllocatedSlot(slot);
-				}
+			if (allocatedSlot != null) {
+				LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", allocatedSlot, requestId);
+				// TODO: Avoid having to send another message to do the slot releasing (e.g. introduce Slot#cancelExecution) and directly return slot
+				allocatedSlot.triggerLogicalSlotRelease();
 			} else {
 				LOG.debug("There was no slot allocation with {} to be cancelled.", requestId);
 			}
@@ -312,24 +312,36 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		// (1) do we have a slot available already?
 		SlotAndLocality slotFromPool = availableSlots.poll(resources, locationPreferences);
 		if (slotFromPool != null) {
-			SimpleSlot slot = createSimpleSlot(slotFromPool.slot(), slotFromPool.locality());
-			allocatedSlots.add(requestId, slot);
-			return CompletableFuture.completedFuture(slot);
-		}
+			final AllocatedSlot allocatedSlot = slotFromPool.slot();
 
-		// the request will be completed by a future
-		final CompletableFuture<LogicalSlot> future = new CompletableFuture<>();
+			final SimpleSlot simpleSlot;
+			try {
+				simpleSlot = allocatedSlot.allocateSimpleSlot(slotFromPool.locality());
+			} catch (SlotException e) {
+				availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
 
-		// (2) need to request a slot
-		if (resourceManagerGateway == null) {
-			// no slot available, and no resource manager connection
-			stashRequestWaitingForResourceManager(requestId, resources, future);
-		} else {
-			// we have a resource manager connection, so let's ask it for more resources
-			requestSlotFromResourceManager(new PendingRequest(requestId, future, resources));
+				return FutureUtils.completedExceptionally(e);
+			}
+
+			allocatedSlots.add(requestId, allocatedSlot);
+			return CompletableFuture.completedFuture(simpleSlot);
 		}
 
-		return future;
+		// we have to request a new allocated slot
+		CompletableFuture<AllocatedSlot> allocatedSlotFuture = requestSlot(
+			requestId,
+			resources);
+
+		return allocatedSlotFuture.thenApply(
+			(AllocatedSlot allocatedSlot) -> {
+				try {
+					return allocatedSlot.allocateSimpleSlot(Locality.UNKNOWN);
+				} catch (SlotException e) {
+					returnAllocatedSlot(allocatedSlot);
+
+					throw new CompletionException("Could not allocate a logical simple slot.", e);
+				}
+			});
 	}
 
 	/**
@@ -354,16 +366,37 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		}
 	}
 
+	private CompletableFuture<AllocatedSlot> requestSlot(
+		SlotRequestID slotRequestId,
+		ResourceProfile resourceProfile) {
+
+		final PendingRequest pendingRequest = new PendingRequest(
+			slotRequestId,
+			resourceProfile);
+
+		if (resourceManagerGateway == null) {
+			stashRequestWaitingForResourceManager(pendingRequest);
+		} else {
+			requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
+		}
+
+		return pendingRequest.getAllocatedSlotFuture();
+	}
+
 	private void requestSlotFromResourceManager(
+			final ResourceManagerGateway resourceManagerGateway,
 			final PendingRequest pendingRequest) {
 
+		Preconditions.checkNotNull(resourceManagerGateway);
+		Preconditions.checkNotNull(pendingRequest);
+
 		LOG.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId());
 
 		final AllocationID allocationId = new AllocationID();
 
 		pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest);
 
-		pendingRequest.getFuture().whenComplete(
+		pendingRequest.getAllocatedSlotFuture().whenComplete(
 			(value, throwable) -> {
 				if (throwable != null) {
 					resourceManagerGateway.cancelSlotRequest(allocationId);
@@ -405,7 +438,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	private void slotRequestToResourceManagerFailed(SlotRequestID slotRequestID, Throwable failure) {
 		PendingRequest request = pendingRequests.removeKeyA(slotRequestID);
 		if (request != null) {
-			request.getFuture().completeExceptionally(new NoResourceAvailableException(
+			request.getAllocatedSlotFuture().completeExceptionally(new NoResourceAvailableException(
 					"No pooled slot available and request to ResourceManager for new slot failed", failure));
 		} else {
 			if (LOG.isDebugEnabled()) {
@@ -425,25 +458,22 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		Preconditions.checkNotNull(pendingRequest);
 		Preconditions.checkNotNull(e);
 
-		if (!pendingRequest.getFuture().isDone()) {
-			pendingRequest.getFuture().completeExceptionally(e);
+		if (!pendingRequest.getAllocatedSlotFuture().isDone()) {
+			pendingRequest.getAllocatedSlotFuture().completeExceptionally(e);
 		}
 	}
 
-	private void stashRequestWaitingForResourceManager(
-			final SlotRequestID requestId,
-			final ResourceProfile resources,
-			final CompletableFuture<LogicalSlot> future) {
+	private void stashRequestWaitingForResourceManager(final PendingRequest pendingRequest) {
 
 		LOG.info("Cannot serve slot request, no ResourceManager connected. " +
-				"Adding as pending request {}",  requestId);
+				"Adding as pending request {}",  pendingRequest.getSlotRequestId());
 
-		waitingForResourceManager.put(requestId, new PendingRequest(requestId, future, resources));
+		waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest);
 
 		scheduleRunAsync(new Runnable() {
 			@Override
 			public void run() {
-				checkTimeoutRequestWaitingForResourceManager(requestId);
+				checkTimeoutRequestWaitingForResourceManager(pendingRequest.getSlotRequestId());
 			}
 		}, resourceManagerRequestsTimeout);
 	}
@@ -465,38 +495,31 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	 * Return the slot back to this pool without releasing it. It's mainly called by failed / cancelled tasks, and the
 	 * slot can be reused by other pending requests if the resource profile matches.n
 	 *
-	 * @param slot The slot needs to be returned
+	 * @param allocationId identifying the slot which is returned
 	 */
-	private void internalReturnAllocatedSlot(Slot slot) {
-		checkNotNull(slot);
-		checkArgument(!slot.isAlive(), "slot is still alive");
-		checkArgument(slot.getOwner() == providerAndOwner, "slot belongs to the wrong pool.");
-
-		// markReleased() is an atomic check-and-set operation, so that the slot is guaranteed
-		// to be returned only once
-		if (slot.markReleased()) {
-			if (allocatedSlots.remove(slot)) {
-				// this slot allocation is still valid, use the slot to fulfill another request
-				// or make it available again
-				final AllocatedSlot taskManagerSlot = slot.getAllocatedSlot();
-				final PendingRequest pendingRequest = pollMatchingPendingRequest(taskManagerSlot);
-	
+	private void internalReturnAllocatedSlot(AllocationID allocationId) {
+		final AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationId);
+
+		if (allocatedSlot != null) {
+			if (allocatedSlot.releaseLogicalSlot()) {
+
+				final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
+
 				if (pendingRequest != null) {
 					LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
-							pendingRequest.getSlotRequestId(), taskManagerSlot.getSlotAllocationId());
+						pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
 
-					SimpleSlot newSlot = createSimpleSlot(taskManagerSlot, Locality.UNKNOWN);
-					allocatedSlots.add(pendingRequest.getSlotRequestId(), newSlot);
-					pendingRequest.getFuture().complete(newSlot);
+					allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
+					pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
+				} else {
+					LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
+					availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
 				}
-				else {
-					LOG.debug("Adding returned slot [{}] to available slots", taskManagerSlot.getSlotAllocationId());
-					availableSlots.add(taskManagerSlot, clock.relativeTimeMillis());
-				}
-			}
-			else {
-				LOG.debug("Returned slot's allocation has been failed. Dropping slot.");
+			} else {
+				LOG.debug("Failed to mark the logical slot of {} as released.", allocatedSlot);
 			}
+		} else {
+			LOG.debug("Could not find allocated slot {}. Ignoring returning slot.", allocationId);
 		}
 	}
 
@@ -524,19 +547,26 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	}
 
 	@Override
-	public CompletableFuture<Collection<SlotOffer>> offerSlots(Collection<Tuple2<AllocatedSlot, SlotOffer>> offers) {
+	public CompletableFuture<Collection<SlotOffer>> offerSlots(
+			TaskManagerLocation taskManagerLocation,
+			TaskManagerGateway taskManagerGateway,
+			Collection<SlotOffer> offers) {
 		validateRunsInMainThread();
 
 		List<CompletableFuture<Optional<SlotOffer>>> acceptedSlotOffers = offers.stream().map(
 			offer -> {
-				CompletableFuture<Optional<SlotOffer>> acceptedSlotOffer = offerSlot(offer.f0).thenApply(
-					(acceptedSlot) -> {
-						if (acceptedSlot) {
-							return Optional.of(offer.f1);
-						} else {
-							return Optional.empty();
-						}
-					});
+				CompletableFuture<Optional<SlotOffer>> acceptedSlotOffer = offerSlot(
+					taskManagerLocation,
+					taskManagerGateway,
+					offer)
+					.thenApply(
+						(acceptedSlot) -> {
+							if (acceptedSlot) {
+								return Optional.of(offer);
+							} else {
+								return Optional.empty();
+							}
+						});
 
 				return acceptedSlotOffer;
 			}
@@ -564,20 +594,25 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	 * we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending
 	 * request waiting for this slot (maybe fulfilled by some other returned slot).
 	 *
-	 * @param slot The offered slot
+	 * @param taskManagerLocation location from where the offer comes from
+	 * @param taskManagerGateway TaskManager gateway
+	 * @param slotOffer the offered slot
 	 * @return True if we accept the offering
 	 */
 	@Override
-	public CompletableFuture<Boolean> offerSlot(final AllocatedSlot slot) {
+	public CompletableFuture<Boolean> offerSlot(
+			final TaskManagerLocation taskManagerLocation,
+			final TaskManagerGateway taskManagerGateway,
+			final SlotOffer slotOffer) {
 		validateRunsInMainThread();
 
 		// check if this TaskManager is valid
-		final ResourceID resourceID = slot.getTaskManagerId();
-		final AllocationID allocationID = slot.getSlotAllocationId();
+		final ResourceID resourceID = taskManagerLocation.getResourceID();
+		final AllocationID allocationID = slotOffer.getAllocationId();
 
 		if (!registeredTaskManagers.contains(resourceID)) {
 			LOG.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
-					slot.getSlotAllocationId(), slot);
+					slotOffer.getAllocationId(), taskManagerLocation);
 			return CompletableFuture.completedFuture(false);
 		}
 
@@ -590,19 +625,26 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 			return CompletableFuture.completedFuture(true);
 		}
 
+		final AllocatedSlot allocatedSlot = new AllocatedSlot(
+			slotOffer.getAllocationId(),
+			taskManagerLocation,
+			slotOffer.getSlotIndex(),
+			slotOffer.getResourceProfile(),
+			taskManagerGateway,
+			providerAndOwner);
+
 		// check whether we have request waiting for this slot
 		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
 		if (pendingRequest != null) {
 			// we were waiting for this!
-			SimpleSlot resultSlot = createSimpleSlot(slot, Locality.UNKNOWN);
-			pendingRequest.getFuture().complete(resultSlot);
-			allocatedSlots.add(pendingRequest.getSlotRequestId(), resultSlot);
+			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
+			pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
 		}
 		else {
 			// we were actually not waiting for this:
 			//   - could be that this request had been fulfilled
 			//   - we are receiving the slots from TaskManagers after becoming leaders
-			availableSlots.add(slot, clock.relativeTimeMillis());
+			availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
 		}
 
 		// we accepted the request in any case. slot will be released after it idled for
@@ -639,11 +681,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 			LOG.debug("Failed available slot [{}] with ", allocationID, cause);
 		}
 		else {
-			Slot slot = allocatedSlots.remove(allocationID);
-			if (slot != null) {
+			AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID);
+			if (allocatedSlot != null) {
 				// release the slot.
 				// since it is not in 'allocatedSlots' any more, it will be dropped o return'
-				slot.releaseInstanceSlot();
+				allocatedSlot.triggerLogicalSlotRelease();
 			}
 			else {
 				LOG.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
@@ -681,9 +723,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		if (registeredTaskManagers.remove(resourceID)) {
 			availableSlots.removeAllForTaskManager(resourceID);
 
-			final Set<Slot> allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID);
-			for (Slot slot : allocatedSlotsForResource) {
-				slot.releaseInstanceSlot();
+			final Set<AllocatedSlot> allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID);
+			for (AllocatedSlot allocatedSlot : allocatedSlotsForResource) {
+				allocatedSlot.triggerLogicalSlotRelease();
+				// TODO: This is a work-around to mark the logical slot as released. We should split up the internalReturnSlot method to not poll pending requests
+				allocatedSlot.releaseLogicalSlot();
 			}
 		}
 
@@ -691,18 +735,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	}
 
 	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private SimpleSlot createSimpleSlot(AllocatedSlot slot, Locality locality) {
-		SimpleSlot result = new SimpleSlot(slot, providerAndOwner, slot.getSlotNumber());
-		if (locality != null) {
-			result.setLocality(locality);
-		}
-		return result;
-	}
-
-	// ------------------------------------------------------------------------
 	//  Methods for tests
 	// ------------------------------------------------------------------------
 
@@ -736,10 +768,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	static class AllocatedSlots {
 
 		/** All allocated slots organized by TaskManager's id */
-		private final Map<ResourceID, Set<Slot>> allocatedSlotsByTaskManager;
+		private final Map<ResourceID, Set<AllocatedSlot>> allocatedSlotsByTaskManager;
 
 		/** All allocated slots organized by AllocationID */
-		private final DualKeyMap<AllocationID, SlotRequestID, Slot> allocatedSlotsById;
+		private final DualKeyMap<AllocationID, SlotRequestID, AllocatedSlot> allocatedSlotsById;
 
 		AllocatedSlots() {
 			this.allocatedSlotsByTaskManager = new HashMap<>(16);
@@ -749,18 +781,18 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		/**
 		 * Adds a new slot to this collection.
 		 *
-		 * @param slot The allocated slot
+		 * @param allocatedSlot The allocated slot
 		 */
-		void add(SlotRequestID slotRequestId, Slot slot) {
-			allocatedSlotsById.put(slot.getAllocatedSlot().getSlotAllocationId(), slotRequestId, slot);
-
-			final ResourceID resourceID = slot.getTaskManagerID();
-			Set<Slot> slotsForTaskManager = allocatedSlotsByTaskManager.get(resourceID);
-			if (slotsForTaskManager == null) {
-				slotsForTaskManager = new HashSet<>();
-				allocatedSlotsByTaskManager.put(resourceID, slotsForTaskManager);
-			}
-			slotsForTaskManager.add(slot);
+		void add(SlotRequestID slotRequestId, AllocatedSlot allocatedSlot) {
+			allocatedSlotsById.put(allocatedSlot.getAllocationId(), slotRequestId, allocatedSlot);
+
+			final ResourceID resourceID = allocatedSlot.getTaskManagerLocation().getResourceID();
+
+			Set<AllocatedSlot> slotsForTaskManager = allocatedSlotsByTaskManager.computeIfAbsent(
+				resourceID,
+				resourceId -> new HashSet<>(4));
+
+			slotsForTaskManager.add(allocatedSlot);
 		}
 
 		/**
@@ -769,11 +801,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		 * @param allocationID The allocation id
 		 * @return The allocated slot, null if we can't find a match
 		 */
-		Slot get(final AllocationID allocationID) {
+		AllocatedSlot get(final AllocationID allocationID) {
 			return allocatedSlotsById.getKeyA(allocationID);
 		}
 
-		Slot get(final SlotRequestID slotRequestId) {
+		AllocatedSlot get(final SlotRequestID slotRequestId) {
 			return allocatedSlotsById.getKeyB(slotRequestId);
 		}
 
@@ -790,27 +822,20 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		/**
 		 * Remove an allocation with slot.
 		 *
-		 * @param slot The slot needs to be removed
-		 */
-		boolean remove(final Slot slot) {
-			return remove(slot.getAllocatedSlot().getSlotAllocationId()) != null;
-		}
-
-		/**
-		 * Remove an allocation with slot.
-		 *
 		 * @param slotId The ID of the slot to be removed
 		 */
-		Slot remove(final AllocationID slotId) {
-			Slot slot = allocatedSlotsById.removeKeyA(slotId);
-			if (slot != null) {
-				final ResourceID taskManagerId = slot.getTaskManagerID();
-				Set<Slot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId);
-				slotsForTM.remove(slot);
+		AllocatedSlot remove(final AllocationID slotId) {
+			AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyA(slotId);
+			if (allocatedSlot != null) {
+				final ResourceID taskManagerId = allocatedSlot.getTaskManagerLocation().getResourceID();
+				Set<AllocatedSlot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId);
+
+				slotsForTM.remove(allocatedSlot);
+
 				if (slotsForTM.isEmpty()) {
 					allocatedSlotsByTaskManager.remove(taskManagerId);
 				}
-				return slot;
+				return allocatedSlot;
 			}
 			else {
 				return null;
@@ -823,11 +848,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		 * @param resourceID The id of the TaskManager
 		 * @return Set of slots which are allocated from the same TaskManager
 		 */
-		Set<Slot> removeSlotsForTaskManager(final ResourceID resourceID) {
-			Set<Slot> slotsForTaskManager = allocatedSlotsByTaskManager.remove(resourceID);
+		Set<AllocatedSlot> removeSlotsForTaskManager(final ResourceID resourceID) {
+			Set<AllocatedSlot> slotsForTaskManager = allocatedSlotsByTaskManager.remove(resourceID);
 			if (slotsForTaskManager != null) {
-				for (Slot slot : slotsForTaskManager) {
-					allocatedSlotsById.removeKeyA(slot.getAllocatedSlot().getSlotAllocationId());
+				for (AllocatedSlot allocatedSlot : slotsForTaskManager) {
+					allocatedSlotsById.removeKeyA(allocatedSlot.getAllocationId());
 				}
 				return slotsForTaskManager;
 			}
@@ -852,7 +877,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		}
 
 		@VisibleForTesting
-		Set<Slot> getSlotsForTaskManager(ResourceID resourceId) {
+		Set<AllocatedSlot> getSlotsForTaskManager(ResourceID resourceId) {
 			if (allocatedSlotsByTaskManager.containsKey(resourceId)) {
 				return allocatedSlotsByTaskManager.get(resourceId);
 			} else {
@@ -892,7 +917,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 			checkNotNull(slot);
 
 			SlotAndTimestamp previous = availableSlots.put(
-					slot.getSlotAllocationId(), new SlotAndTimestamp(slot, timestamp));
+					slot.getAllocationId(), new SlotAndTimestamp(slot, timestamp));
 
 			if (previous == null) {
 				final ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
@@ -951,7 +976,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 					if (onTaskManager != null) {
 						for (AllocatedSlot candidate : onTaskManager) {
 							if (candidate.getResourceProfile().isMatching(resourceProfile)) {
-								remove(candidate.getSlotAllocationId());
+								remove(candidate.getAllocationId());
 								return new SlotAndLocality(candidate, Locality.LOCAL);
 							}
 						}
@@ -964,7 +989,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 					if (onHost != null) {
 						for (AllocatedSlot candidate : onHost) {
 							if (candidate.getResourceProfile().isMatching(resourceProfile)) {
-								remove(candidate.getSlotAllocationId());
+								remove(candidate.getAllocationId());
 								return new SlotAndLocality(candidate, Locality.HOST_LOCAL);
 							}
 						}
@@ -977,7 +1002,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 				final AllocatedSlot slot = candidate.slot();
 
 				if (slot.getResourceProfile().isMatching(resourceProfile)) {
-					remove(slot.getSlotAllocationId());
+					remove(slot.getAllocationId());
 					return new SlotAndLocality(
 							slot, hadLocationPreference ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
 				}
@@ -1002,7 +1027,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 
 				// remove from the base set and the by-host view
 				for (AllocatedSlot slot : slotsForTm) {
-					availableSlots.remove(slot.getSlotAllocationId());
+					availableSlots.remove(slot.getAllocationId());
 					slotsForHost.remove(slot);
 				}
 
@@ -1082,7 +1107,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 
 		@Override
 		public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
-			gateway.returnAllocatedSlot(slot);
+			gateway.returnAllocatedSlot(slot.getSlotContext());
 			return CompletableFuture.completedFuture(true);
 		}
 
@@ -1097,7 +1122,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 			slotFuture.whenComplete(
 				(LogicalSlot slot, Throwable failure) -> {
 					if (failure != null) {
-						gateway.cancelSlotAllocation(requestId);
+						gateway.cancelSlotRequest(requestId);
 					}
 			});
 			return slotFuture;
@@ -1113,25 +1138,25 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 
 		private final SlotRequestID slotRequestId;
 
-		private final CompletableFuture<LogicalSlot> future;
-
 		private final ResourceProfile resourceProfile;
 
+		private final CompletableFuture<AllocatedSlot> allocatedSlotFuture;
+
 		PendingRequest(
 				SlotRequestID slotRequestId,
-				CompletableFuture<LogicalSlot> future,
 				ResourceProfile resourceProfile) {
 			this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
-			this.future = Preconditions.checkNotNull(future);
 			this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
+
+			allocatedSlotFuture = new CompletableFuture<>();
 		}
 
 		public SlotRequestID getSlotRequestId() {
 			return slotRequestId;
 		}
 
-		public CompletableFuture<LogicalSlot> getFuture() {
-			return future;
+		public CompletableFuture<AllocatedSlot> getAllocatedSlotFuture() {
+			return allocatedSlotFuture;
 		}
 
 		public ResourceProfile getResourceProfile() {

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
index ad2a6a6..71de054 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
@@ -19,12 +19,12 @@
 package org.apache.flink.runtime.instance;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -76,9 +76,15 @@ public interface SlotPoolGateway extends RpcGateway {
 
 	CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceID);
 
-	CompletableFuture<Boolean> offerSlot(AllocatedSlot slot);
+	CompletableFuture<Boolean> offerSlot(
+		TaskManagerLocation taskManagerLocation,
+		TaskManagerGateway taskManagerGateway,
+		SlotOffer slotOffer);
 
-	CompletableFuture<Collection<SlotOffer>> offerSlots(Collection<Tuple2<AllocatedSlot, SlotOffer>> offers);
+	CompletableFuture<Collection<SlotOffer>> offerSlots(
+		TaskManagerLocation taskManagerLocation,
+		TaskManagerGateway taskManagerGateway,
+		Collection<SlotOffer> offers);
 	
 	void failAllocation(AllocationID allocationID, Exception cause);
 
@@ -93,7 +99,7 @@ public interface SlotPoolGateway extends RpcGateway {
 			Iterable<TaskManagerLocation> locationPreferences,
 			@RpcTimeout Time timeout);
 
-	void returnAllocatedSlot(Slot slot);
+	void returnAllocatedSlot(SlotContext slotInformation);
 
 	/**
 	 * Cancel a slot allocation request.
@@ -101,7 +107,7 @@ public interface SlotPoolGateway extends RpcGateway {
 	 * @param requestId identifying the slot allocation request
 	 * @return Future acknowledge if the slot allocation has been cancelled
 	 */
-	CompletableFuture<Acknowledge> cancelSlotAllocation(SlotRequestID requestId);
+	CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID requestId);
 
 	/**
 	 * Request ID identifying different slot requests.

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 8857be7..a3c38e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -364,7 +364,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 			Locality locality = instanceLocalityPair.getRight();
 
 			try {
-				SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId());
+				SimpleSlot slot = instanceToUse.allocateSimpleSlot();
 				
 				// if the instance has further available slots, re-add it to the set of available resources.
 				if (instanceToUse.hasResourcesAvailable()) {
@@ -426,7 +426,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 				JobVertexID groupID = vertex.getJobvertexId();
 				
 				// allocate a shared slot from the instance
-				SharedSlot sharedSlot = instanceToUse.allocateSharedSlot(vertex.getJobId(), groupAssignment);
+				SharedSlot sharedSlot = instanceToUse.allocateSharedSlot(groupAssignment);
 
 				// if the instance has further available slots, re-add it to the set of available resources.
 				if (instanceToUse.hasResourcesAvailable()) {
@@ -562,7 +562,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 				ExecutionVertex vertex = task.getTaskToExecute().getVertex();
 				
 				try {
-					SimpleSlot newSlot = instance.allocateSimpleSlot(vertex.getJobId());
+					SimpleSlot newSlot = instance.allocateSimpleSlot();
 					if (newSlot != null) {
 						
 						// success, remove from the task queue and notify the future

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
deleted file mode 100644
index 4910862..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.slots;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The {@code AllocatedSlot} represents a slot that the JobManager allocated from a TaskManager.
- * It represents a slice of allocated resources from the TaskManager.
- * 
- * <p>To allocate an {@code AllocatedSlot}, the requests a slot from the ResourceManager. The
- * ResourceManager picks (or starts) a TaskManager that will then allocate the slot to the
- * JobManager and notify the JobManager.
- * 
- * <p>Note: Prior to the resource management changes introduced in (Flink Improvement Proposal 6),
- * an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the
- * JobManager. All slots had a default unknown resource profile. 
- */
-public class AllocatedSlot {
-
-	/** The ID under which the slot is allocated. Uniquely identifies the slot. */
-	private final AllocationID slotAllocationId;
-
-	/** The ID of the job this slot is allocated for */
-	private final JobID jobID;
-
-	/** The location information of the TaskManager to which this slot belongs */
-	private final TaskManagerLocation taskManagerLocation;
-
-	/** The resource profile of the slot provides */
-	private final ResourceProfile resourceProfile;
-
-	/** RPC gateway to call the TaskManager that holds this slot */
-	private final TaskManagerGateway taskManagerGateway;
-
-	/** The number of the slot on the TaskManager to which slot belongs. Purely informational. */
-	private final int slotNumber;
-
-	// ------------------------------------------------------------------------
-
-	public AllocatedSlot(
-			AllocationID slotAllocationId,
-			JobID jobID,
-			TaskManagerLocation location,
-			int slotNumber,
-			ResourceProfile resourceProfile,
-			TaskManagerGateway taskManagerGateway) {
-		this.slotAllocationId = checkNotNull(slotAllocationId);
-		this.jobID = checkNotNull(jobID);
-		this.taskManagerLocation = checkNotNull(location);
-		this.slotNumber = slotNumber;
-		this.resourceProfile = checkNotNull(resourceProfile);
-		this.taskManagerGateway = checkNotNull(taskManagerGateway);
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the ID under which the slot is allocated, which uniquely identifies the slot.
-	 * 
-	 * @return The ID under which the slot is allocated
-	 */
-	public AllocationID getSlotAllocationId() {
-		return slotAllocationId;
-	}
-
-	/**
-	 * Gets the ID of the TaskManager on which this slot was allocated.
-	 * 
-	 * <p>This is equivalent to {@link #getTaskManagerLocation()#getTaskManagerId()}.
-	 * 
-	 * @return This slot's TaskManager's ID.
-	 */
-	public ResourceID getTaskManagerId() {
-		return getTaskManagerLocation().getResourceID();
-	}
-
-	/**
-	 * Returns the ID of the job this allocated slot belongs to.
-	 *
-	 * @return the ID of the job this allocated slot belongs to
-	 */
-	public JobID getJobID() {
-		return jobID;
-	}
-
-	/**
-	 * Gets the number of the slot.
-	 *
-	 * @return The number of the slot on the TaskManager.
-	 */
-	public int getSlotNumber() {
-		return slotNumber;
-	}
-
-	/**
-	 * Gets the resource profile of the slot.
-	 *
-	 * @return The resource profile of the slot.
-	 */
-	public ResourceProfile getResourceProfile() {
-		return resourceProfile;
-	}
-
-	/**
-	 * Gets the location info of the TaskManager that offers this slot.
-	 *
-	 * @return The location info of the TaskManager that offers this slot
-	 */
-	public TaskManagerLocation getTaskManagerLocation() {
-		return taskManagerLocation;
-	}
-
-	/**
-	 * Gets the actor gateway that can be used to send messages to the TaskManager.
-	 * <p>
-	 * This method should be removed once the new interface-based RPC abstraction is in place
-	 *
-	 * @return The actor gateway that can be used to send messages to the TaskManager.
-	 */
-	public TaskManagerGateway getTaskManagerGateway() {
-		return taskManagerGateway;
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * This always returns a reference hash code.
-	 */
-	@Override
-	public final int hashCode() {
-		return super.hashCode();
-	}
-
-	/**
-	 * This always checks based on reference equality.
-	 */
-	@Override
-	public final boolean equals(Object obj) {
-		return this == obj;
-	}
-
-	@Override
-	public String toString() {
-		return "AllocatedSlot " + slotAllocationId + " @ " + taskManagerLocation + " - " + slotNumber;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
new file mode 100644
index 0000000..5dccc1f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Simple implementation of the {@link SlotContext} interface for the legacy code.
+ */
+public class SimpleSlotContext implements SlotContext {
+
+	private final AllocationID allocationId;
+
+	private final TaskManagerLocation taskManagerLocation;
+
+	private final int physicalSlotNumber;
+
+	private final TaskManagerGateway taskManagerGateway;
+
+	public SimpleSlotContext(
+			AllocationID allocationId,
+			TaskManagerLocation taskManagerLocation,
+			int physicalSlotNumber,
+			TaskManagerGateway taskManagerGateway) {
+		this.allocationId = Preconditions.checkNotNull(allocationId);
+		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
+		this.physicalSlotNumber = physicalSlotNumber;
+		this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
+	}
+
+	@Override
+	public AllocationID getAllocationId() {
+		return allocationId;
+	}
+
+	@Override
+	public TaskManagerLocation getTaskManagerLocation() {
+		return taskManagerLocation;
+	}
+
+	@Override
+	public int getPhysicalSlotNumber() {
+		return physicalSlotNumber;
+	}
+
+	@Override
+	public TaskManagerGateway getTaskManagerGateway() {
+		return taskManagerGateway;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
index 3fe5346..5ae057d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager.slots;
 
+import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
new file mode 100644
index 0000000..d8a1aa4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/**
+ * Interface for the context of a logical {@link Slot}. This context contains information
+ * about the underlying allocated slot and how to communicate with the TaskManager on which
+ * it was allocated.
+ */
+public interface SlotContext {
+
+	/**
+	 * Gets the ID under which the slot is allocated, which uniquely identifies the slot.
+	 *
+	 * @return The ID under which the slot is allocated
+	 */
+	AllocationID getAllocationId();
+
+	/**
+	 * Gets the location info of the TaskManager that offers this slot.
+	 *
+	 * @return The location info of the TaskManager that offers this slot
+	 */
+	TaskManagerLocation getTaskManagerLocation();
+
+	/**
+	 * Gets the number of the slot.
+	 *
+	 * @return The number of the slot on the TaskManager.
+	 */
+	int getPhysicalSlotNumber();
+
+	/**
+	 * Gets the actor gateway that can be used to send messages to the TaskManager.
+	 * <p>
+	 * This method should be removed once the new interface-based RPC abstraction is in place
+	 *
+	 * @return The gateway that can be used to send messages to the TaskManager.
+	 */
+	TaskManagerGateway getTaskManagerGateway();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotException.java
new file mode 100644
index 0000000..48e7e25
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Base class for slot related exceptions.
+ */
+public class SlotException extends FlinkException {
+	private static final long serialVersionUID = -8009227041400667546L;
+
+	public SlotException(String message) {
+		super(message);
+	}
+
+	public SlotException(Throwable cause) {
+		super(cause);
+	}
+
+	public SlotException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 687b6d1..324557f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -65,7 +65,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -109,7 +108,6 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -649,7 +647,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	@Override
 	public CompletableFuture<Collection<SlotOffer>> offerSlots(
 			final ResourceID taskManagerId,
-			final Iterable<SlotOffer> slots,
+			final Collection<SlotOffer> slots,
 			final Time timeout) {
 
 		Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId);
@@ -658,27 +656,15 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			return FutureUtils.completedExceptionally(new Exception("Unknown TaskManager " + taskManagerId));
 		}
 
-		final JobID jid = jobGraph.getJobID();
 		final TaskManagerLocation taskManagerLocation = taskManager.f0;
 		final TaskExecutorGateway taskExecutorGateway = taskManager.f1;
 
-		final ArrayList<Tuple2<AllocatedSlot, SlotOffer>> slotsAndOffers = new ArrayList<>();
-
 		final RpcTaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway, getFencingToken());
 
-		for (SlotOffer slotOffer : slots) {
-			final AllocatedSlot slot = new AllocatedSlot(
-				slotOffer.getAllocationId(),
-				jid,
-				taskManagerLocation,
-				slotOffer.getSlotIndex(),
-				slotOffer.getResourceProfile(),
-				rpcTaskManagerGateway);
-
-			slotsAndOffers.add(new Tuple2<>(slot, slotOffer));
-		}
-
-		return slotPoolGateway.offerSlots(slotsAndOffers);
+		return slotPoolGateway.offerSlots(
+			taskManagerLocation,
+			rpcTaskManagerGateway,
+			slots);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index ad906c2..09d995e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -195,7 +195,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
 	 */
 	CompletableFuture<Collection<SlotOffer>> offerSlots(
 			final ResourceID taskManagerId,
-			final Iterable<SlotOffer> slots,
+			final Collection<SlotOffer> slots,
 			@RpcTimeout final Time timeout);
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index b489478..16da8e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -184,7 +184,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 
 			final Instance instance = getInstance(new ActorTaskManagerGateway(instanceGateway));
 
-			final SimpleSlot slot = instance.allocateSimpleSlot(jobId);
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
@@ -631,13 +631,13 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 
 		final TaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
 
-		final SimpleSlot sourceSlot1 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 0);
+		final SimpleSlot sourceSlot1 = createSlot(localTaskManagerLocation, 0);
 
-		final SimpleSlot sourceSlot2 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 1);
+		final SimpleSlot sourceSlot2 = createSlot(localTaskManagerLocation, 1);
 
-		final SimpleSlot sinkSlot1 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 0);
+		final SimpleSlot sinkSlot1 = createSlot(localTaskManagerLocation, 0);
 
-		final SimpleSlot sinkSlot2 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 1);
+		final SimpleSlot sinkSlot2 = createSlot(localTaskManagerLocation, 1);
 
 		slotFutures.get(sourceVertexId)[0].complete(sourceSlot1);
 		slotFutures.get(sourceVertexId)[1].complete(sourceSlot2);
@@ -654,9 +654,8 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 		}
 	}
 
-	private SimpleSlot createSlot(JobID jobId, TaskManagerLocation taskManagerLocation, int index) {
+	private SimpleSlot createSlot(TaskManagerLocation taskManagerLocation, int index) {
 		return new SimpleSlot(
-			jobId,
 			mock(SlotOwner.class),
 			taskManagerLocation,
 			index,


[4/6] flink git commit: [FLINK-8088] Associate logical slots with the slot request id

Posted by tr...@apache.org.
[FLINK-8088] Associate logical slots with the slot request id

Before logical slots like the SimpleSlot and SharedSlot where associated to the
actually allocated slot via the AllocationID. This, however, was sub-optimal because
allocated slots can be re-used to fulfill also other slot requests (logical slots).
Therefore, we should bind the logical slots to the right id with the right lifecycle
which is the slot request id.

This closes #5089.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc1c375a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc1c375a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc1c375a

Branch: refs/heads/master
Commit: bc1c375aa061f27a2fbc5a7688b06da70fed5d20
Parents: a569f38
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Nov 24 18:06:10 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 14 14:32:25 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/instance/AllocatedSlot.java   |  66 +++++++++--
 .../apache/flink/runtime/instance/Instance.java |   9 +-
 .../flink/runtime/instance/LogicalSlot.java     |   8 ++
 .../flink/runtime/instance/SharedSlot.java      |   5 +
 .../flink/runtime/instance/SimpleSlot.java      |   6 +
 .../org/apache/flink/runtime/instance/Slot.java |   4 +-
 .../apache/flink/runtime/instance/SlotPool.java | 112 +++++++++++--------
 .../flink/runtime/instance/SlotPoolGateway.java |  14 +--
 .../flink/runtime/instance/SlotRequestID.java   |  34 ++++++
 .../jobmanager/slots/SimpleSlotContext.java     |  10 ++
 .../runtime/jobmanager/slots/SlotContext.java   |  13 ++-
 .../runtime/jobmanager/slots/SlotOwner.java     |   6 +-
 .../ExecutionGraphSchedulingTest.java           |   7 +-
 .../executiongraph/ExecutionGraphTestUtils.java |  10 +-
 .../runtime/executiongraph/ExecutionTest.java   |  11 +-
 .../ExecutionVertexDeploymentTest.java          |   6 +-
 .../ExecutionVertexLocalityTest.java            |   9 +-
 .../utils/SimpleSlotProvider.java               |  17 ++-
 .../runtime/instance/AllocatedSlotsTest.java    |   6 +-
 .../flink/runtime/instance/SlotPoolRpcTest.java |  10 +-
 .../flink/runtime/instance/SlotPoolTest.java    |  23 ++--
 .../runtime/instance/TestingLogicalSlot.java    |  16 ++-
 .../jobmanager/slots/DummySlotOwner.java        |   4 +-
 .../jobmanager/slots/TestingSlotOwner.java      |  12 +-
 24 files changed, 289 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
index 7036044..a3f98f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobmanager.slots.SlotException;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -44,10 +45,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the
  * JobManager. All slots had a default unknown resource profile. 
  */
-public class AllocatedSlot implements SlotContext {
+public class AllocatedSlot {
 
 	/** The ID under which the slot is allocated. Uniquely identifies the slot. */
-	private final AllocationID slotAllocationId;
+	private final AllocationID allocationId;
 
 	/** The location information of the TaskManager to which this slot belongs */
 	private final TaskManagerLocation taskManagerLocation;
@@ -68,13 +69,13 @@ public class AllocatedSlot implements SlotContext {
 	// ------------------------------------------------------------------------
 
 	public AllocatedSlot(
-			AllocationID slotAllocationId,
+			AllocationID allocationId,
 			TaskManagerLocation location,
 			int physicalSlotNumber,
 			ResourceProfile resourceProfile,
 			TaskManagerGateway taskManagerGateway,
 			SlotOwner slotOwner) {
-		this.slotAllocationId = checkNotNull(slotAllocationId);
+		this.allocationId = checkNotNull(allocationId);
 		this.taskManagerLocation = checkNotNull(location);
 		this.physicalSlotNumber = physicalSlotNumber;
 		this.resourceProfile = checkNotNull(resourceProfile);
@@ -92,7 +93,7 @@ public class AllocatedSlot implements SlotContext {
 	 * @return The ID under which the slot is allocated
 	 */
 	public AllocationID getAllocationId() {
-		return slotAllocationId;
+		return allocationId;
 	}
 
 	/**
@@ -182,12 +183,16 @@ public class AllocatedSlot implements SlotContext {
 	/**
 	 * Allocates a logical {@link SimpleSlot}.
 	 *
+	 * @param slotRequestId identifying the corresponding slot request
+	 * @param locality specifying the locality of the allocated slot
 	 * @return an allocated logical simple slot
 	 * @throws SlotException if we could not allocate a simple slot
 	 */
-	public SimpleSlot allocateSimpleSlot(Locality locality) throws SlotException {
+	public SimpleSlot allocateSimpleSlot(SlotRequestID slotRequestId, Locality locality) throws SlotException {
+		final AllocatedSlotContext allocatedSlotContext = new AllocatedSlotContext(
+			slotRequestId);
 
-		final SimpleSlot simpleSlot = new SimpleSlot(this, slotOwner, physicalSlotNumber);
+		final SimpleSlot simpleSlot = new SimpleSlot(allocatedSlotContext, slotOwner, physicalSlotNumber);
 
 		if (logicalSlotReference.compareAndSet(null, simpleSlot)) {
 			simpleSlot.setLocality(locality);
@@ -200,12 +205,16 @@ public class AllocatedSlot implements SlotContext {
 	/**
 	 * Allocates a logical {@link SharedSlot}.
 	 *
+	 * @param slotRequestId identifying the corresponding slot request
 	 * @param slotSharingGroupAssignment the slot sharing group to which the shared slot shall belong
 	 * @return an allocated logical shared slot
 	 * @throws SlotException if we could not allocate a shared slot
 	 */
-	public SharedSlot allocateSharedSlot(SlotSharingGroupAssignment slotSharingGroupAssignment) throws SlotException {
-		final SharedSlot sharedSlot = new SharedSlot(this, slotOwner, slotSharingGroupAssignment);
+	public SharedSlot allocateSharedSlot(SlotRequestID slotRequestId, SlotSharingGroupAssignment slotSharingGroupAssignment) throws SlotException {
+
+		final AllocatedSlotContext allocatedSlotContext = new AllocatedSlotContext(
+			slotRequestId);
+		final SharedSlot sharedSlot = new SharedSlot(allocatedSlotContext, slotOwner, slotSharingGroupAssignment);
 
 		if (logicalSlotReference.compareAndSet(null, sharedSlot)) {
 
@@ -236,6 +245,43 @@ public class AllocatedSlot implements SlotContext {
 
 	@Override
 	public String toString() {
-		return "AllocatedSlot " + slotAllocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber;
+		return "AllocatedSlot " + allocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber;
+	}
+
+	/**
+	 * Slot context for {@link AllocatedSlot}.
+	 */
+	private final class AllocatedSlotContext implements SlotContext {
+
+		private final SlotRequestID slotRequestId;
+
+		private AllocatedSlotContext(SlotRequestID slotRequestId) {
+			this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
+		}
+
+		@Override
+		public SlotRequestID getSlotRequestId() {
+			return slotRequestId;
+		}
+
+		@Override
+		public AllocationID getAllocationId() {
+			return allocationId;
+		}
+
+		@Override
+		public TaskManagerLocation getTaskManagerLocation() {
+			return taskManagerLocation;
+		}
+
+		@Override
+		public int getPhysicalSlotNumber() {
+			return physicalSlotNumber;
+		}
+
+		@Override
+		public TaskManagerGateway getTaskManagerGateway() {
+			return taskManagerGateway;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 54c8971..44ee29d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -276,12 +276,15 @@ public class Instance implements SlotOwner {
 	 * <p>The method will transition the slot to the "released" state. If the slot is already in state
 	 * "released", this method will do nothing.</p>
 	 * 
-	 * @param slot The slot to return.
+	 * @param logicalSlot The slot to return.
 	 * @return Future which is completed with true, if the slot was returned, false if not.
 	 */
 	@Override
-	public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
-		checkNotNull(slot);
+	public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot) {
+		checkNotNull(logicalSlot);
+		checkArgument(logicalSlot instanceof Slot);
+
+		final Slot slot = ((Slot) logicalSlot);
 		checkArgument(!slot.isAlive(), "slot is still alive");
 		checkArgument(slot.getOwner() == this, "slot belongs to the wrong TaskManager.");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
index e663265..b3104ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
@@ -93,6 +93,14 @@ public interface LogicalSlot {
 	AllocationID getAllocationId();
 
 	/**
+	 * Gets the slot request id uniquely identifying the request with which this
+	 * slot has been allocated.
+	 *
+	 * @return Unique id identifying the slot request with which this slot was allocated
+	 */
+	SlotRequestID getSlotRequestId();
+
+	/**
 	 * Payload for a logical slot.
 	 */
 	interface Payload {

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
index 8637159..8c9fe1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
@@ -213,6 +213,11 @@ public class SharedSlot extends Slot implements LogicalSlot {
 		return getSlotContext().getAllocationId();
 	}
 
+	@Override
+	public SlotRequestID getSlotRequestId() {
+		return getSlotContext().getSlotRequestId();
+	}
+
 	/**
 	 * Gets the set of all slots allocated as sub-slots of this shared slot.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index d397c08..e98832f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -97,6 +97,7 @@ public class SimpleSlot extends Slot implements LogicalSlot {
 			parent != null ?
 				parent.getSlotContext() :
 				new SimpleSlotContext(
+					NO_SLOT_REQUEST_ID,
 					NO_ALLOCATION_ID,
 					location,
 					slotNumber,
@@ -274,6 +275,11 @@ public class SimpleSlot extends Slot implements LogicalSlot {
 		return getSlotContext().getAllocationId();
 	}
 
+	@Override
+	public SlotRequestID getSlotRequestId() {
+		return getSlotContext().getSlotRequestId();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index 6262c9a..e82f075 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -61,7 +61,8 @@ public abstract class Slot {
 	private static final int RELEASED = 2;
 
 	// temporary placeholder for Slots that are not constructed from an AllocatedSlot (prior to FLIP-6)
-	protected static final AllocationID NO_ALLOCATION_ID = new AllocationID(0, 0);
+	protected static final AllocationID NO_ALLOCATION_ID = new AllocationID(0L, 0L);
+	protected static final SlotRequestID NO_SLOT_REQUEST_ID = new SlotRequestID(0L, 0L);
 
 	// ------------------------------------------------------------------------
 
@@ -111,6 +112,7 @@ public abstract class Slot {
 
 		// create a simple slot context
 		this.slotContext = new SimpleSlotContext(
+			NO_SLOT_REQUEST_ID,
 			NO_ALLOCATION_ID,
 			location,
 			slotNumber,

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 2ccea75..a72f57b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
 import org.apache.flink.runtime.jobmanager.slots.SlotException;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
@@ -278,25 +277,31 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	}
 
 	@Override
-	public void returnAllocatedSlot(SlotContext allocatedSlot) {
-		internalReturnAllocatedSlot(allocatedSlot.getAllocationId());
+	public void returnAllocatedSlot(SlotRequestID slotRequestId) {
+		final AllocatedSlot allocatedSlot = allocatedSlots.remove(slotRequestId);
+
+		if (allocatedSlot != null) {
+			internalReturnAllocatedSlot(allocatedSlot);
+		} else {
+			log.debug("There is no allocated slot with request id {}. Ignoring this request.", slotRequestId);
+		}
 	}
 
 	@Override
-	public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID requestId) {
-		final PendingRequest pendingRequest = removePendingRequest(requestId);
+	public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID slotRequestId) {
+		final PendingRequest pendingRequest = removePendingRequest(slotRequestId);
 
 		if (pendingRequest != null) {
-			failPendingRequest(pendingRequest, new CancellationException("Allocation with request id" + requestId + " cancelled."));
+			failPendingRequest(pendingRequest, new CancellationException("Allocation with request id" + slotRequestId + " cancelled."));
 		} else {
-			final AllocatedSlot allocatedSlot = allocatedSlots.get(requestId);
+			final AllocatedSlot allocatedSlot = allocatedSlots.get(slotRequestId);
 
 			if (allocatedSlot != null) {
-				LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", allocatedSlot, requestId);
+				LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", allocatedSlot, slotRequestId);
 				// TODO: Avoid having to send another message to do the slot releasing (e.g. introduce Slot#cancelExecution) and directly return slot
 				allocatedSlot.triggerLogicalSlotRelease();
 			} else {
-				LOG.debug("There was no slot allocation with {} to be cancelled.", requestId);
+				LOG.debug("There was no slot allocation with {} to be cancelled.", slotRequestId);
 			}
 		}
 
@@ -316,7 +321,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 
 			final SimpleSlot simpleSlot;
 			try {
-				simpleSlot = allocatedSlot.allocateSimpleSlot(slotFromPool.locality());
+				simpleSlot = allocatedSlot.allocateSimpleSlot(requestId, slotFromPool.locality());
 			} catch (SlotException e) {
 				availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
 
@@ -335,9 +340,9 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		return allocatedSlotFuture.thenApply(
 			(AllocatedSlot allocatedSlot) -> {
 				try {
-					return allocatedSlot.allocateSimpleSlot(Locality.UNKNOWN);
+					return allocatedSlot.allocateSimpleSlot(requestId, Locality.UNKNOWN);
 				} catch (SlotException e) {
-					returnAllocatedSlot(allocatedSlot);
+					internalReturnAllocatedSlot(allocatedSlot);
 
 					throw new CompletionException("Could not allocate a logical simple slot.", e);
 				}
@@ -495,31 +500,25 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	 * Return the slot back to this pool without releasing it. It's mainly called by failed / cancelled tasks, and the
 	 * slot can be reused by other pending requests if the resource profile matches.n
 	 *
-	 * @param allocationId identifying the slot which is returned
+	 * @param allocatedSlot which shall be returned
 	 */
-	private void internalReturnAllocatedSlot(AllocationID allocationId) {
-		final AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationId);
-
-		if (allocatedSlot != null) {
-			if (allocatedSlot.releaseLogicalSlot()) {
+	private void internalReturnAllocatedSlot(AllocatedSlot allocatedSlot) {
+		if (allocatedSlot.releaseLogicalSlot()) {
 
-				final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
+			final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
 
-				if (pendingRequest != null) {
-					LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
-						pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
+			if (pendingRequest != null) {
+				LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
+					pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
 
-					allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
-					pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
-				} else {
-					LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
-					availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
-				}
+				allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
+				pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
 			} else {
-				LOG.debug("Failed to mark the logical slot of {} as released.", allocatedSlot);
+				LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
+				availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
 			}
 		} else {
-			LOG.debug("Could not find allocated slot {}. Ignoring returning slot.", allocationId);
+			LOG.debug("Failed to mark the logical slot of {} as released.", allocatedSlot);
 		}
 	}
 
@@ -820,25 +819,48 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		}
 
 		/**
-		 * Remove an allocation with slot.
+		 * Removes the allocated slot specified by the provided slot allocation id.
 		 *
-		 * @param slotId The ID of the slot to be removed
+		 * @param allocationID identifying the allocated slot to remove
+		 * @return The removed allocated slot or null.
 		 */
-		AllocatedSlot remove(final AllocationID slotId) {
-			AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyA(slotId);
+		@Nullable
+		AllocatedSlot remove(final AllocationID allocationID) {
+			AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyA(allocationID);
+
 			if (allocatedSlot != null) {
-				final ResourceID taskManagerId = allocatedSlot.getTaskManagerLocation().getResourceID();
-				Set<AllocatedSlot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId);
+				removeAllocatedSlot(allocatedSlot);
+			}
 
-				slotsForTM.remove(allocatedSlot);
+			return allocatedSlot;
+		}
 
-				if (slotsForTM.isEmpty()) {
-					allocatedSlotsByTaskManager.remove(taskManagerId);
-				}
-				return allocatedSlot;
+		/**
+		 * Removes the allocated slot specified by the provided slot request id.
+		 *
+		 * @param slotRequestId identifying the allocated slot to remove
+		 * @return The removed allocated slot or null.
+		 */
+		@Nullable
+		AllocatedSlot remove(final SlotRequestID slotRequestId) {
+			final AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyB(slotRequestId);
+
+			if (allocatedSlot != null) {
+				removeAllocatedSlot(allocatedSlot);
 			}
-			else {
-				return null;
+
+			return allocatedSlot;
+		}
+
+		private void removeAllocatedSlot(final AllocatedSlot allocatedSlot) {
+			Preconditions.checkNotNull(allocatedSlot);
+			final ResourceID taskManagerId = allocatedSlot.getTaskManagerLocation().getResourceID();
+			Set<AllocatedSlot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId);
+
+			slotsForTM.remove(allocatedSlot);
+
+			if (slotsForTM.isEmpty()) {
+				allocatedSlotsByTaskManager.remove(taskManagerId);
 			}
 		}
 
@@ -1106,8 +1128,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		}
 
 		@Override
-		public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
-			gateway.returnAllocatedSlot(slot.getSlotContext());
+		public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot slot) {
+			gateway.returnAllocatedSlot(slot.getSlotRequestId());
 			return CompletableFuture.completedFuture(true);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
index 71de054..103bc61 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -31,7 +30,6 @@ import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.AbstractID;
 
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
@@ -99,20 +97,14 @@ public interface SlotPoolGateway extends RpcGateway {
 			Iterable<TaskManagerLocation> locationPreferences,
 			@RpcTimeout Time timeout);
 
-	void returnAllocatedSlot(SlotContext slotInformation);
+	void returnAllocatedSlot(SlotRequestID slotRequestId);
 
 	/**
 	 * Cancel a slot allocation request.
 	 *
-	 * @param requestId identifying the slot allocation request
+	 * @param slotRequestId identifying the slot allocation request
 	 * @return Future acknowledge if the slot allocation has been cancelled
 	 */
-	CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID requestId);
+	CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID slotRequestId);
 
-	/**
-	 * Request ID identifying different slot requests.
-	 */
-	final class SlotRequestID extends AbstractID {
-		private static final long serialVersionUID = -6072105912250154283L;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java
new file mode 100644
index 0000000..8e19944
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.util.AbstractID;
+
+/**
+ * Request ID identifying different slot requests.
+ */
+public final class SlotRequestID extends AbstractID {
+    private static final long serialVersionUID = -6072105912250154283L;
+
+    public SlotRequestID(long lowerPart, long upperPart) {
+        super(lowerPart, upperPart);
+    }
+
+    public SlotRequestID() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
index 5dccc1f..a5b75d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager.slots;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.instance.SlotRequestID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 
@@ -27,6 +28,8 @@ import org.apache.flink.util.Preconditions;
  */
 public class SimpleSlotContext implements SlotContext {
 
+	private final SlotRequestID slotRequestId;
+
 	private final AllocationID allocationId;
 
 	private final TaskManagerLocation taskManagerLocation;
@@ -36,10 +39,12 @@ public class SimpleSlotContext implements SlotContext {
 	private final TaskManagerGateway taskManagerGateway;
 
 	public SimpleSlotContext(
+			SlotRequestID slotRequestId,
 			AllocationID allocationId,
 			TaskManagerLocation taskManagerLocation,
 			int physicalSlotNumber,
 			TaskManagerGateway taskManagerGateway) {
+		this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
 		this.allocationId = Preconditions.checkNotNull(allocationId);
 		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
 		this.physicalSlotNumber = physicalSlotNumber;
@@ -47,6 +52,11 @@ public class SimpleSlotContext implements SlotContext {
 	}
 
 	@Override
+	public SlotRequestID getSlotRequestId() {
+		return slotRequestId;
+	}
+
+	@Override
 	public AllocationID getAllocationId() {
 		return allocationId;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
index d8a1aa4..1e0317a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager.slots;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.SlotRequestID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 /**
@@ -30,9 +31,17 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 public interface SlotContext {
 
 	/**
-	 * Gets the ID under which the slot is allocated, which uniquely identifies the slot.
+	 * Gets the slot request id under which the slot has been requested. This id uniquely identifies the logical slot.
 	 *
-	 * @return The ID under which the slot is allocated
+	 * @return The id under which the slot has been requested
+	 */
+	SlotRequestID getSlotRequestId();
+
+	/**
+	 * Gets the id under which the slot has been allocated on the TaskManager. This id uniquely identifies the
+	 * physical slot.
+	 *
+	 * @return The id under whic teh slot has been allocated on the TaskManager
 	 */
 	AllocationID getAllocationId();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
index cb4488d..bc1ced4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager.slots;
 
-import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.LogicalSlot;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -30,8 +30,8 @@ public interface SlotOwner {
 	/**
 	 * Return the given slot to the slot owner.
 	 *
-	 * @param slot to return
+	 * @param logicalSlot to return
 	 * @return Future which is completed with true if the slot could be returned, otherwise with false
 	 */
-	CompletableFuture<Boolean> returnAllocatedSlot(Slot slot);
+	CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 586f51b..18e6cf1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -30,8 +30,8 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.instance.SlotRequestID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -284,7 +284,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(parallelism);
 		final TestingSlotOwner slotOwner = new TestingSlotOwner();
 		slotOwner.setReturnAllocatedSlotConsumer(
-			(Slot slot) -> returnedSlots.offer(slot.getSlotContext().getAllocationId()));
+			(LogicalSlot logicalSlot) -> returnedSlots.offer(logicalSlot.getAllocationId()));
 
 		final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
 		final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
@@ -365,7 +365,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(2);
 		final TestingSlotOwner slotOwner = new TestingSlotOwner();
 		slotOwner.setReturnAllocatedSlotConsumer(
-			(Slot slot) -> returnedSlots.offer(slot.getSlotContext().getAllocationId()));
+			(LogicalSlot logicalSlot) -> returnedSlots.offer(logicalSlot.getAllocationId()));
 
 		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
 		final SimpleSlot[] slots = new SimpleSlot[parallelism];
@@ -448,6 +448,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 				ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345);
 
 		SimpleSlotContext slot = new SimpleSlotContext(
+			new SlotRequestID(),
 			new AllocationID(),
 			location,
 			0,

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 06ffaa0..c97329f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.instance.SlotRequestID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -244,10 +245,11 @@ public class ExecutionGraphTestUtils {
 				ResourceID.generate(), InetAddress.getLoopbackAddress(), 6572);
 
 		final SimpleSlotContext allocatedSlot = new SimpleSlotContext(
-				new AllocationID(),
-				location,
-				0,
-				gateway);
+			new SlotRequestID(),
+			new AllocationID(),
+			location,
+			0,
+			gateway);
 
 		return new SimpleSlot(
 			allocatedSlot,

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 71d6f51..e3fd0df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
@@ -306,7 +305,7 @@ public class ExecutionTest extends TestLogger {
 
 		Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
 
-		CompletableFuture<Slot> returnedSlotFuture = slotOwner.getReturnedSlotFuture();
+		CompletableFuture<LogicalSlot> returnedSlotFuture = slotOwner.getReturnedSlotFuture();
 		CompletableFuture<?> terminationFuture = executionVertex.cancel();
 
 		// run canceling in a separate thread to allow an interleaving between termination
@@ -334,15 +333,15 @@ public class ExecutionTest extends TestLogger {
 	 */
 	private static final class SingleSlotTestingSlotOwner implements SlotOwner {
 
-		final CompletableFuture<Slot> returnedSlot = new CompletableFuture<>();
+		final CompletableFuture<LogicalSlot> returnedSlot = new CompletableFuture<>();
 
-		public CompletableFuture<Slot> getReturnedSlotFuture() {
+		public CompletableFuture<LogicalSlot> getReturnedSlotFuture() {
 			return returnedSlot;
 		}
 
 		@Override
-		public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
-			return CompletableFuture.completedFuture(returnedSlot.complete(slot));
+		public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot) {
+			return CompletableFuture.completedFuture(returnedSlot.complete(logicalSlot));
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 7f97d12..63cebf3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.TestLogger;
@@ -371,8 +371,8 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 		result.getPartitions()[0].addConsumerGroup();
 		result.getPartitions()[0].addConsumer(mockEdge, 0);
 
-		AllocatedSlot allocatedSlot = mock(AllocatedSlot.class);
-		when(allocatedSlot.getAllocationId()).thenReturn(new AllocationID());
+		SlotContext slotContext = mock(SlotContext.class);
+		when(slotContext.getAllocationId()).thenReturn(new AllocationID());
 
 		LogicalSlot slot = mock(LogicalSlot.class);
 		when(slot.getAllocationId()).thenReturn(new AllocationID());

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 98f7259..bffbb6a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -31,13 +31,14 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.instance.SlotRequestID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -234,7 +235,11 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 		//  - exposing test methods in the ExecutionVertex leads to undesirable setters 
 
 		SlotContext slot = new SimpleSlotContext(
-				new AllocationID(), location, 0, mock(TaskManagerGateway.class));
+			new SlotRequestID(),
+			new AllocationID(),
+			location,
+			0,
+			mock(TaskManagerGateway.class));
 
 		SimpleSlot simpleSlot = new SimpleSlot(slot, mock(SlotOwner.class), 0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index 9a19d24..82953d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.instance.SlotRequestID;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.slots.SlotContext;
@@ -33,6 +34,7 @@ import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
 
 import java.net.InetAddress;
 import java.util.ArrayDeque;
@@ -61,10 +63,11 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 
 		for (int i = 0; i < numSlots; i++) {
 			SimpleSlotContext as = new SimpleSlotContext(
-					new AllocationID(),
-					new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i),
-					0,
-					taskManagerGateway);
+				new SlotRequestID(),
+				new AllocationID(),
+				new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i),
+				0,
+				taskManagerGateway);
 			slots.add(as);
 		}
 	}
@@ -94,7 +97,11 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 	}
 
 	@Override
-	public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
+	public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot) {
+		Preconditions.checkArgument(logicalSlot instanceof Slot);
+
+		final Slot slot = ((Slot) logicalSlot);
+
 		synchronized (slots) {
 			slots.add(slot.getSlotContext());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
index bc396c1..223d43c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
@@ -41,7 +41,7 @@ public class AllocatedSlotsTest extends TestLogger {
 		SlotPool.AllocatedSlots allocatedSlots = new SlotPool.AllocatedSlots();
 
 		final AllocationID allocation1 = new AllocationID();
-		final SlotPoolGateway.SlotRequestID slotRequestID = new SlotPoolGateway.SlotRequestID();
+		final SlotRequestID slotRequestID = new SlotRequestID();
 		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
 		final ResourceID resource1 = taskManagerLocation.getResourceID();
 		final AllocatedSlot slot1 = createSlot(allocation1, taskManagerLocation);
@@ -56,7 +56,7 @@ public class AllocatedSlotsTest extends TestLogger {
 		assertEquals(1, allocatedSlots.size());
 
 		final AllocationID allocation2 = new AllocationID();
-		final SlotPoolGateway.SlotRequestID slotRequestID2 = new SlotPoolGateway.SlotRequestID();
+		final SlotRequestID slotRequestID2 = new SlotRequestID();
 		final AllocatedSlot slot2 = createSlot(allocation2, taskManagerLocation);
 
 		allocatedSlots.add(slotRequestID2, slot2);
@@ -71,7 +71,7 @@ public class AllocatedSlotsTest extends TestLogger {
 		assertEquals(2, allocatedSlots.size());
 
 		final AllocationID allocation3 = new AllocationID();
-		final SlotPoolGateway.SlotRequestID slotRequestID3 = new SlotPoolGateway.SlotRequestID();
+		final SlotRequestID slotRequestID3 = new SlotRequestID();
 		final TaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation();
 		final ResourceID resource2 = taskManagerLocation2.getResourceID();
 		final AllocatedSlot slot3 = createSlot(allocation3, taskManagerLocation2);

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index 5d82f47..60e1d34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -111,7 +111,7 @@ public class SlotPoolRpcTest extends TestLogger {
 			pool.start(JobMasterId.generate(), "foobar");
 
 			CompletableFuture<LogicalSlot> future = pool.allocateSlot(
-				new SlotPoolGateway.SlotRequestID(),
+				new SlotRequestID(),
 				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
@@ -144,7 +144,7 @@ public class SlotPoolRpcTest extends TestLogger {
 			pool.start(JobMasterId.generate(), "foobar");
 			SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
 
-			SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
+			SlotRequestID requestId = new SlotRequestID();
 			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
 				requestId,
 				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
@@ -188,7 +188,7 @@ public class SlotPoolRpcTest extends TestLogger {
 			ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
 			pool.connectToResourceManager(resourceManagerGateway);
 
-			SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
+			SlotRequestID requestId = new SlotRequestID();
 			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
 				requestId,
 				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
@@ -239,7 +239,7 @@ public class SlotPoolRpcTest extends TestLogger {
 
 			pool.connectToResourceManager(resourceManagerGateway);
 
-			SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
+			SlotRequestID requestId = new SlotRequestID();
 			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
 				requestId,
 				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
@@ -295,7 +295,7 @@ public class SlotPoolRpcTest extends TestLogger {
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime());
 
-		final CompletableFuture<SlotPoolGateway.SlotRequestID> cancelFuture = new CompletableFuture<>();
+		final CompletableFuture<SlotRequestID> cancelFuture = new CompletableFuture<>();
 
 		pool.setCancelSlotAllocationConsumer(
 			slotRequestID -> cancelFuture.complete(slotRequestID));

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 9d90a12..ec20f6b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -103,7 +102,7 @@ public class SlotPoolTest extends TestLogger {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
 			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
 
-			SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
+			SlotRequestID requestId = new SlotRequestID();
 			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(requestId, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 			assertFalse(future.isDone());
 
@@ -137,8 +136,8 @@ public class SlotPoolTest extends TestLogger {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
 			slotPool.registerTaskManager(taskManagerLocation.getResourceID());
 
-			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
-			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 
 			assertFalse(future1.isDone());
 			assertFalse(future2.isDone());
@@ -187,7 +186,7 @@ public class SlotPoolTest extends TestLogger {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
 			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
 
-			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 			assertFalse(future1.isDone());
 
 			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -208,7 +207,7 @@ public class SlotPoolTest extends TestLogger {
 			// return this slot to pool
 			slot1.releaseSlot();
 
-			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 
 			// second allocation fulfilled by previous slot returning
 			LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS);
@@ -233,7 +232,7 @@ public class SlotPoolTest extends TestLogger {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
 			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
 
-			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 			assertFalse(future.isDone());
 
 			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -284,8 +283,8 @@ public class SlotPoolTest extends TestLogger {
 
 		final SlotPool slotPool = new SlotPool(rpcService, jobId) {
 			@Override
-			public void returnAllocatedSlot(SlotContext allocatedSlot) {
-				super.returnAllocatedSlot(allocatedSlot);
+			public void returnAllocatedSlot(SlotRequestID slotRequestId) {
+				super.returnAllocatedSlot(slotRequestId);
 
 				slotReturnFuture.complete(true);
 			}
@@ -295,14 +294,14 @@ public class SlotPoolTest extends TestLogger {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
 			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
 
-			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 
 			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
 			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
 
 			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
 
-			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 
 			final SlotOffer slotOffer = new SlotOffer(
 				slotRequest.getAllocationId(),
@@ -357,7 +356,7 @@ public class SlotPoolTest extends TestLogger {
 			slotPoolGateway.connectToResourceManager(resourceManagerGateway);
 
 			CompletableFuture<LogicalSlot> slotFuture = slotPoolGateway.allocateSlot(
-				new SlotPoolGateway.SlotRequestID(),
+				new SlotRequestID(),
 				scheduledUnit,
 				ResourceProfile.UNKNOWN,
 				Collections.emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
index 925933d..2066017 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
@@ -44,27 +44,32 @@ public class TestingLogicalSlot implements LogicalSlot {
 	private final int slotNumber;
 
 	private final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
-
+	
 	private final AllocationID allocationId;
 
+	private final SlotRequestID slotRequestId;
+
 	public TestingLogicalSlot() {
 		this(
 			new LocalTaskManagerLocation(),
 			new SimpleAckingTaskManagerGateway(),
 			0,
-			new AllocationID());
+			new AllocationID(),
+			new SlotRequestID());
 	}
 
 	public TestingLogicalSlot(
 			TaskManagerLocation taskManagerLocation,
 			TaskManagerGateway taskManagerGateway,
 			int slotNumber,
-			AllocationID allocationId) {
+			AllocationID allocationId,
+			SlotRequestID slotRequestId) {
 		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
 		this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
 		this.payloadReference = new AtomicReference<>();
 		this.slotNumber = slotNumber;
 		this.allocationId = Preconditions.checkNotNull(allocationId);
+		this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
 	}
 
 	@Override
@@ -109,4 +114,9 @@ public class TestingLogicalSlot implements LogicalSlot {
 	public AllocationID getAllocationId() {
 		return allocationId;
 	}
+
+	@Override
+	public SlotRequestID getSlotRequestId() {
+		return slotRequestId;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
index 6894542..6d17ad0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager.slots;
 
-import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.LogicalSlot;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -27,7 +27,7 @@ import java.util.concurrent.CompletableFuture;
  */
 public class DummySlotOwner implements SlotOwner {
 	@Override
-	public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
+	public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot) {
 		return CompletableFuture.completedFuture(false);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
index 7c124ef..e7f9485 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager.slots;
 
-import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.LogicalSlot;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
@@ -28,18 +28,18 @@ import java.util.function.Consumer;
  */
 public class TestingSlotOwner implements SlotOwner {
 
-	private volatile Consumer<Slot> returnAllocatedSlotConsumer;
+	private volatile Consumer<LogicalSlot> returnAllocatedSlotConsumer;
 
-	public void setReturnAllocatedSlotConsumer(Consumer<Slot> returnAllocatedSlotConsumer) {
+	public void setReturnAllocatedSlotConsumer(Consumer<LogicalSlot> returnAllocatedSlotConsumer) {
 		this.returnAllocatedSlotConsumer = returnAllocatedSlotConsumer;
 	}
 
 	@Override
-	public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
-		final Consumer<Slot> currentReturnAllocatedSlotConsumer = this.returnAllocatedSlotConsumer;
+	public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot) {
+		final Consumer<LogicalSlot> currentReturnAllocatedSlotConsumer = this.returnAllocatedSlotConsumer;
 
 		if (currentReturnAllocatedSlotConsumer != null) {
-			currentReturnAllocatedSlotConsumer.accept(slot);
+			currentReturnAllocatedSlotConsumer.accept(logicalSlot);
 		}
 
 		return CompletableFuture.completedFuture(true);