You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/21 12:22:18 UTC

[38/50] [abbrv] flink git commit: [FLINK-4360] [tm] Implement TM -> JM registration logic

http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java
new file mode 100644
index 0000000..66d8102
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+/**
+ * Exception indicating that the slot allocation on the task manager failed.
+ */
+public class SlotAllocationException extends TaskManagerException {
+
+	private static final long serialVersionUID = -4764932098204266773L;
+
+	public SlotAllocationException(String message) {
+		super(message);
+	}
+
+	public SlotAllocationException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public SlotAllocationException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 42cb919..88123b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -70,10 +70,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 
 	/** Interface for slot actions, such as freeing them or timing them out */
 	private SlotActions slotActions;
-
-	/** The timeout for allocated slots */
-	private Time slotTimeout;
-
+	
 	/** Whether the table has been started */
 	private boolean started;
 
@@ -104,7 +101,6 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 		slotsPerJob = new HashMap<>(4);
 
 		slotActions = null;
-		slotTimeout = null;
 		started = false;
 	}
 
@@ -112,11 +108,9 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	 * Start the task slot table with the given slot actions and slot timeout value.
 	 *
 	 * @param initialSlotActions to use for slot actions
-	 * @param initialSlotTimeout to use for slot timeouts
 	 */
-	public void start(SlotActions initialSlotActions, Time initialSlotTimeout) {
+	public void start(SlotActions initialSlotActions) {
 		this.slotActions = Preconditions.checkNotNull(initialSlotActions);
-		this.slotTimeout = Preconditions.checkNotNull(initialSlotTimeout);
 
 		timerService.start(this);
 
@@ -129,7 +123,6 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	public void stop() {
 		started = false;
 		timerService.stop();
-		slotTimeout = null;
 		slotActions = null;
 	}
 
@@ -144,9 +137,10 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	 * @param index of the task slot to allocate
 	 * @param jobId to allocate the task slot for
 	 * @param allocationId identifying the allocation
+	 * @param slotTimeout until the slot times out
 	 * @return True if the task slot could be allocated; otherwise false
 	 */
-	public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId) {
+	public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) {
 		checkInit();
 
 		TaskSlot taskSlot = taskSlots.get(index);
@@ -180,7 +174,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	 *
 	 * @param allocationId to identify the task slot to mark as active
 	 * @throws SlotNotFoundException if the slot could not be found for the given allocation id
-	 * @return True if the slot could be marked active
+	 * @return True if the slot could be marked active; otherwise false
 	 */
 	public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException {
 		checkInit();
@@ -190,6 +184,8 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 		if (taskSlot != null) {
 			if (taskSlot.markActive()) {
 				// unregister a potential timeout
+				LOG.info("Activate slot {}.", allocationId);
+
 				timerService.unregisterTimeout(allocationId);
 
 				return true;
@@ -206,10 +202,11 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	 * then a {@link SlotNotFoundException} is thrown.
 	 *
 	 * @param allocationId to identify the task slot to mark as inactive
+	 * @param slotTimeout until the slot times out
 	 * @throws SlotNotFoundException if the slot could not be found for the given allocation id
 	 * @return True if the slot could be marked inactive
 	 */
-	public boolean markSlotInactive(AllocationID allocationId) throws SlotNotFoundException {
+	public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) throws SlotNotFoundException {
 		checkInit();
 
 		TaskSlot taskSlot = getTaskSlot(allocationId);
@@ -253,6 +250,12 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	 */
 	public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
 		checkInit();
+		
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Free slot {}.", allocationId, cause);
+		} else {
+			LOG.info("Free slot {}.", allocationId);
+		}
 
 		TaskSlot taskSlot = getTaskSlot(allocationId);
 
@@ -322,8 +325,6 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	 * @return True if the given task slot is allocated for the given job and allocation id
 	 */
 	public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
-		checkInit();
-
 		TaskSlot taskSlot = taskSlots.get(index);
 
 		return taskSlot.isAllocated(jobId, allocationId);
@@ -336,7 +337,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	 * @param allocationId identifying the allocation
 	 * @return True if there exists a task slot which is active for the given job and allocation id.
 	 */
-	public boolean existActiveSlot(JobID jobId, AllocationID allocationId) {
+	public boolean existsActiveSlot(JobID jobId, AllocationID allocationId) {
 		TaskSlot taskSlot = getTaskSlot(allocationId);
 
 		if (taskSlot != null) {
@@ -431,6 +432,8 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	 * @return The removed task if there is any for the given execution attempt id; otherwise null
 	 */
 	public Task removeTask(ExecutionAttemptID executionAttemptID) {
+		checkInit();
+
 		TaskSlotMapping taskSlotMapping = taskSlotMappings.remove(executionAttemptID);
 
 		if (taskSlotMapping != null) {
@@ -481,6 +484,8 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 
 	@Override
 	public void notifyTimeout(AllocationID key, UUID ticket) {
+		checkInit();
+
 		if (slotActions != null) {
 			slotActions.timeoutSlot(key, ticket);
 		}
@@ -493,9 +498,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	private TaskSlot getTaskSlot(AllocationID allocationId) {
 		Preconditions.checkNotNull(allocationId);
 
-		TaskSlot taskSlot = allocationIDTaskSlotMap.get(allocationId);
-
-		return taskSlot;
+		return allocationIDTaskSlotMap.get(allocationId);
 	}
 
 	private void checkInit() {

http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f16255e..ccca3c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -265,7 +265,7 @@ public class Task implements Runnable, TaskActions {
 		this.jobId = checkNotNull(tdd.getJobID());
 		this.vertexId = checkNotNull(tdd.getVertexID());
 		this.executionId  = checkNotNull(tdd.getExecutionId());
-		this.allocationId = checkNotNull(tdd.getAllocationID());
+		this.allocationId = checkNotNull(tdd.getAllocationId());
 		this.taskNameWithSubtask = taskInfo.getTaskNameWithSubtasks();
 		this.jobConfiguration = checkNotNull(tdd.getJobConfiguration());
 		this.taskConfiguration = checkNotNull(tdd.getTaskConfiguration());

http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 39ea176..993fd19 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -29,6 +29,7 @@ import java.util.List;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -44,6 +45,7 @@ public class TaskDeploymentDescriptorTest {
 	public void testSerialization() {
 		try {
 			final JobID jobID = new JobID();
+			final AllocationID allocationId = new AllocationID();
 			final JobVertexID vertexID = new JobVertexID();
 			final ExecutionAttemptID execId = new ExecutionAttemptID();
 			final String jobName = "job name";
@@ -61,7 +63,7 @@ public class TaskDeploymentDescriptorTest {
 			final List<URL> requiredClasspaths = new ArrayList<URL>(0);
 			final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig());
 
-			final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, jobName, vertexID, execId,
+			final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, allocationId, jobName, vertexID, execId,
 				executionConfig, taskName, numberOfKeyGroups, indexInSubtaskGroup, currentNumberOfSubtasks, attemptNumber,
 				jobConfiguration, taskConfiguration, invokableClass.getName(), producedResults, inputGates,
 				requiredJars, requiredClasspaths, 47);
@@ -69,12 +71,14 @@ public class TaskDeploymentDescriptorTest {
 			final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig);
 	
 			assertFalse(orig.getJobID() == copy.getJobID());
+			assertFalse(orig.getAllocationId() == copy.getAllocationId());
 			assertFalse(orig.getVertexID() == copy.getVertexID());
 			assertFalse(orig.getTaskName() == copy.getTaskName());
 			assertFalse(orig.getJobConfiguration() == copy.getJobConfiguration());
 			assertFalse(orig.getTaskConfiguration() == copy.getTaskConfiguration());
 
 			assertEquals(orig.getJobID(), copy.getJobID());
+			assertEquals(orig.getAllocationId(), copy.getAllocationId());
 			assertEquals(orig.getVertexID(), copy.getVertexID());
 			assertEquals(orig.getTaskName(), copy.getTaskName());
 			assertEquals(orig.getNumberOfKeyGroups(), copy.getNumberOfKeyGroups());

http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index a255027..38e372d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -91,7 +91,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) throws Exception {
 		LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID);
 		if (service != null) {
 			return service;

http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index 49f2268..073aeac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -75,7 +76,8 @@ public class TaskManagerGroupTest extends TestLogger {
 		final ExecutionAttemptID execution21 = new ExecutionAttemptID();
 
 		TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
-			jid1, 
+			jid1,
+			new AllocationID(),
 			jobName1, 
 			vertex11, 
 			execution11, 
@@ -91,6 +93,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
 		TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
 			jid1,
+			new AllocationID(),
 			jobName1,
 			vertex12,
 			execution12,
@@ -106,6 +109,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
 		TaskDeploymentDescriptor tdd3 = new TaskDeploymentDescriptor(
 			jid2,
+			new AllocationID(),
 			jobName2,
 			vertex21,
 			execution21,
@@ -121,6 +125,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
 		TaskDeploymentDescriptor tdd4 = new TaskDeploymentDescriptor(
 			jid1,
+			new AllocationID(),
 			jobName1,
 			vertex13,
 			execution13,
@@ -192,6 +197,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
 		TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
 			jid1,
+			new AllocationID(),
 			jobName1,
 			vertex11,
 			execution11,
@@ -207,6 +213,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
 		TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
 			jid1,
+			new AllocationID(),
 			jobName1,
 			vertex12,
 			execution12,
@@ -222,6 +229,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
 		TaskDeploymentDescriptor tdd3 = new TaskDeploymentDescriptor(
 			jid2,
+			new AllocationID(),
 			jobName2,
 			vertex21,
 			execution21,

http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 558d3c2..948c129 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -68,7 +68,7 @@ public class SlotManagerTest {
 		taskExecutorRegistration = Mockito.mock(TaskExecutorRegistration.class);
 		TaskExecutorGateway gateway = Mockito.mock(TaskExecutorGateway.class);
 		Mockito.when(taskExecutorRegistration.getTaskExecutorGateway()).thenReturn(gateway);
-		Mockito.when(gateway.requestSlot(any(SlotID.class), any(AllocationID.class), any(UUID.class), any(Time.class)))
+		Mockito.when(gateway.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
 			.thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 24d959e..86cd1f8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -144,7 +144,7 @@ public class SlotProtocolTest extends TestLogger {
 		Mockito
 			.when(
 				taskExecutorGateway
-					.requestSlot(any(SlotID.class), any(AllocationID.class), any(UUID.class), any(Time.class)))
+					.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
 			.thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>());
 		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
@@ -161,7 +161,7 @@ public class SlotProtocolTest extends TestLogger {
 
 		// 4) Slot becomes available and TaskExecutor gets a SlotRequest
 		verify(taskExecutorGateway, timeout(5000))
-			.requestSlot(eq(slotID), eq(allocationID), any(UUID.class), any(Time.class));
+			.requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(UUID.class), any(Time.class));
 	}
 
 	/**
@@ -189,7 +189,7 @@ public class SlotProtocolTest extends TestLogger {
 		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 		Mockito.when(
 			taskExecutorGateway
-				.requestSlot(any(SlotID.class), any(AllocationID.class), any(UUID.class), any(Time.class)))
+				.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
 			.thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>());
 		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
@@ -240,7 +240,7 @@ public class SlotProtocolTest extends TestLogger {
 
 		// 4) a SlotRequest is routed to the TaskExecutor
 		verify(taskExecutorGateway, timeout(5000))
-			.requestSlot(eq(slotID), eq(allocationID), any(UUID.class), any(Time.class));
+			.requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(UUID.class), any(Time.class));
 	}
 
 	private static TestingLeaderElectionService configureHA(

http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index e7143ae..bbde331 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -287,17 +288,27 @@ public class RpcCompletenessTest extends TestLogger {
 				if (!futureClass.equals(RpcCompletenessTest.futureClass)) {
 					return false;
 				} else {
-					Class<?> valueClass = ReflectionUtil.getTemplateType1(gatewayMethod.getGenericReturnType());
+					ReflectionUtil.FullTypeInfo fullValueTypeInfo = ReflectionUtil.getFullTemplateType(gatewayMethod.getGenericReturnType(), 0);
 
 					if (endpointMethod.getReturnType().equals(futureClass)) {
-						Class<?> rpcEndpointValueClass = ReflectionUtil.getTemplateType1(endpointMethod.getGenericReturnType());
+						ReflectionUtil.FullTypeInfo fullRpcEndpointValueTypeInfo = ReflectionUtil.getFullTemplateType(endpointMethod.getGenericReturnType(), 0);
 
 						// check if we have the same future value types
-						if (valueClass != null && rpcEndpointValueClass != null && !checkType(valueClass, rpcEndpointValueClass)) {
-							return false;
+						if (fullValueTypeInfo != null && fullRpcEndpointValueTypeInfo != null) {
+							Iterator<Class<?>> valueClasses = fullValueTypeInfo.getClazzIterator();
+							Iterator<Class<?>> rpcClasses = fullRpcEndpointValueTypeInfo.getClazzIterator();
+
+							while (valueClasses.hasNext() && rpcClasses.hasNext()) {
+								if (!checkType(valueClasses.next(), rpcClasses.next())) {
+									return false;
+								}
+							}
+
+							// both should be empty
+							return !valueClasses.hasNext() && !rpcClasses.hasNext();
 						}
 					} else {
-						if (valueClass != null && !checkType(valueClass, endpointMethod.getReturnType())) {
+						if (fullValueTypeInfo != null && !checkType(fullValueTypeInfo.getClazz(), endpointMethod.getReturnType())) {
 							return false;
 						}
 					}
@@ -342,16 +353,16 @@ public class RpcCompletenessTest extends TestLogger {
 		if (method.getReturnType().equals(Void.TYPE)) {
 			builder.append("void").append(" ");
 		} else if (method.getReturnType().equals(futureClass)) {
-			Class<?> valueClass = ReflectionUtil.getTemplateType1(method.getGenericReturnType());
+			ReflectionUtil.FullTypeInfo fullTypeInfo = ReflectionUtil.getFullTemplateType(method.getGenericReturnType(), 0);
 
 			builder
 				.append(futureClass.getSimpleName())
 				.append("<")
-				.append(valueClass != null ? valueClass.getSimpleName() : "")
+				.append(fullTypeInfo != null ? fullTypeInfo.toString() : "")
 				.append(">");
 
-			if (valueClass != null) {
-				builder.append("/").append(valueClass.getSimpleName());
+			if (fullTypeInfo != null) {
+				builder.append("/").append(fullTypeInfo);
 			}
 
 			builder.append(" ");

http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index baae251..23c6833 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -18,21 +18,45 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.NonHaServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
@@ -40,19 +64,42 @@ import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequ
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 
+import org.junit.rules.TestName;
+import org.mockito.Matchers;
 import org.powermock.api.mockito.PowerMockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
+import static org.hamcrest.Matchers.contains;
 
 public class TaskExecutorTest extends TestLogger {
 
+	@Rule
+	public TestName name = new TestName();
+
+
 	@Test
 	public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
 		final ResourceID resourceID = ResourceID.generate();
@@ -85,6 +132,8 @@ public class TaskExecutorTest extends TestLogger {
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
 				mock(TaskSlotTable.class),
+				mock(JobManagerTable.class),
+				mock(JobLeaderService.class),
 				mock(FatalErrorHandler.class));
 
 			taskManager.start();
@@ -142,6 +191,8 @@ public class TaskExecutorTest extends TestLogger {
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
 				mock(TaskSlotTable.class),
+				mock(JobManagerTable.class),
+				mock(JobLeaderService.class),
 				mock(FatalErrorHandler.class));
 
 			taskManager.start();
@@ -173,17 +224,385 @@ public class TaskExecutorTest extends TestLogger {
 	}
 
 	/**
+	 * Tests that we can submit a task to the TaskManager given that we've allocated a slot there.
+	 */
+	@Test(timeout = 1000L)
+	public void testTaskSubmission() throws Exception {
+		final Configuration configuration = new Configuration();
+
+		final TestingSerialRpcService rpc = new TestingSerialRpcService();
+		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+		final JobID jobId = new JobID();
+		final AllocationID allocationId = new AllocationID();
+		final UUID jobManagerLeaderId = UUID.randomUUID();
+		final JobVertexID jobVertexId = new JobVertexID();
+
+		final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
+			jobId,
+			allocationId,
+			name.getMethodName(),
+			jobVertexId,
+			new ExecutionAttemptID(),
+			new SerializedValue<>(new ExecutionConfig()),
+			"test task",
+			1,
+			0,
+			1,
+			0,
+			configuration,
+			configuration,
+			TestInvokable.class.getName(),
+			Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+			Collections.<InputGateDeploymentDescriptor>emptyList(),
+			Collections.<BlobKey>emptyList(),
+			Collections.<URL>emptyList(),
+			0);
+
+		final LibraryCacheManager libraryCacheManager = mock(LibraryCacheManager.class);
+		when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader());
+
+		final JobManagerConnection jobManagerConnection = new JobManagerConnection(
+			mock(JobMasterGateway.class),
+			jobManagerLeaderId,
+			mock(TaskManagerActions.class),
+			mock(CheckpointResponder.class),
+			libraryCacheManager,
+			mock(ResultPartitionConsumableNotifier.class),
+			mock(PartitionStateChecker.class));
+
+		final JobManagerTable jobManagerTable = new JobManagerTable();
+		jobManagerTable.put(jobId, jobManagerConnection);
+
+		final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
+		when(taskSlotTable.existsActiveSlot(eq(jobId), eq(allocationId))).thenReturn(true);
+		when(taskSlotTable.addTask(any(Task.class))).thenReturn(true);
+
+		final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+
+		when(networkEnvironment.createKvStateTaskRegistry(eq(jobId), eq(jobVertexId))).thenReturn(mock(TaskKvStateRegistry.class));
+
+		final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class);
+
+		when(taskManagerMetricGroup.addTaskForJob(eq(tdd))).thenReturn(mock(TaskMetricGroup.class));
+
+		final HighAvailabilityServices haServices = mock(HighAvailabilityServices.class);
+		when(haServices.getResourceManagerLeaderRetriever()).thenReturn(mock(LeaderRetrievalService.class));
+
+		try {
+
+			TaskExecutor taskManager = new TaskExecutor(
+				taskManagerConfiguration,
+				mock(TaskManagerLocation.class),
+				rpc,
+				mock(MemoryManager.class),
+				mock(IOManager.class),
+				networkEnvironment,
+				haServices,
+				mock(MetricRegistry.class),
+				taskManagerMetricGroup,
+				mock(BroadcastVariableManager.class),
+				mock(FileCache.class),
+				taskSlotTable,
+				jobManagerTable,
+				mock(JobLeaderService.class),
+				mock(FatalErrorHandler.class));
+
+			taskManager.start();
+
+			taskManager.submitTask(tdd, jobManagerLeaderId);
+
+			Future<Boolean> completionFuture = TestInvokable.completableFuture;
+
+			completionFuture.get();
+
+		} finally {
+			rpc.stopService();
+		}
+	}
+
+	/**
+	 * Test invokable which completes the given future when executed.
+	 */
+	public static class TestInvokable extends AbstractInvokable {
+
+		static final CompletableFuture<Boolean> completableFuture = new FlinkCompletableFuture<>();
+
+		@Override
+		public void invoke() throws Exception {
+			completableFuture.complete(true);
+		}
+	}
+
+	/**
+	 * Tests that a TaskManager detects a job leader for which has reserved slots. Upon detecting
+	 * the job leader, it will offer all reserved slots to the JobManager.
+	 */
+	@Test
+	public void testJobLeaderDetection() throws TestingFatalErrorHandler.TestingException, SlotAllocationException {
+		final JobID jobId = new JobID();
+
+		final TestingSerialRpcService rpc = new TestingSerialRpcService();
+		final Configuration configuration = new Configuration();
+		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+		final ResourceID resourceId = new ResourceID("foobar");
+		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234);
+		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+		final TimerService<AllocationID> timerService = mock(TimerService.class);
+		final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), timerService);
+		final JobManagerTable jobManagerTable = new JobManagerTable();
+		final JobLeaderService jobLeaderService = new JobLeaderService(resourceId);
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+		final TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService();
+		final TestingLeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService();
+		haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
+		haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService);
+
+		final String resourceManagerAddress = "rm";
+		final UUID resourceManagerLeaderId = UUID.randomUUID();
+
+		final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
+		final InstanceID registrationId = new InstanceID();
+
+		when(resourceManagerGateway.registerTaskExecutor(
+			eq(resourceManagerLeaderId),
+			any(String.class),
+			eq(resourceId),
+			any(SlotReport.class),
+			any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L)));
+
+		final String jobManagerAddress = "jm";
+		final UUID jobManagerLeaderId = UUID.randomUUID();
+		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
+		final int blobPort = 42;
+
+		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
+
+		when(jobMasterGateway.registerTaskManager(
+			any(String.class),
+			eq(resourceId),
+			eq(jobManagerLeaderId),
+			any(Time.class)
+		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
+		when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
+
+		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
+		rpc.registerGateway(jobManagerAddress, jobMasterGateway);
+
+		final AllocationID allocationId = new AllocationID();
+		final SlotID slotId = new SlotID(resourceId, 0);
+
+		try {
+			TaskExecutor taskManager = new TaskExecutor(
+				taskManagerConfiguration,
+				taskManagerLocation,
+				rpc,
+				mock(MemoryManager.class),
+				mock(IOManager.class),
+				mock(NetworkEnvironment.class),
+				haServices,
+				mock(MetricRegistry.class),
+				mock(TaskManagerMetricGroup.class),
+				mock(BroadcastVariableManager.class),
+				mock(FileCache.class),
+				taskSlotTable,
+				jobManagerTable,
+				jobLeaderService,
+				testingFatalErrorHandler);
+
+			taskManager.start();
+
+			// tell the task manager about the rm leader
+			resourceManagerLeaderRetrievalService.notifyListener(resourceManagerAddress, resourceManagerLeaderId);
+
+			// request slots from the task manager under the given allocation id
+			TMSlotRequestReply reply = taskManager.requestSlot(slotId, jobId, allocationId, jobManagerAddress, resourceManagerLeaderId);
+
+			// this is hopefully successful :-)
+			assertTrue(reply instanceof TMSlotRequestRegistered);
+
+			// now inform the task manager about the new job leader
+			jobManagerLeaderRetrievalService.notifyListener(jobManagerAddress, jobManagerLeaderId);
+
+			// the job leader should get the allocation id offered
+			verify(jobMasterGateway).offerSlots((Iterable<AllocationID>)Matchers.argThat(contains(allocationId)), eq(jobManagerLeaderId), any(Time.class));
+		} finally {
+			// check if a concurrent error occurred
+			testingFatalErrorHandler.rethrowException();
+
+			rpc.stopService();
+		}
+	}
+
+	/**
+	 * Tests that accepted slots go into state assigned and the others are returned to the  resource
+	 * manager.
+	 */
+	@Test
+	public void testSlotAcceptance() throws Exception {
+		final JobID jobId = new JobID();
+
+		final TestingSerialRpcService rpc = new TestingSerialRpcService();
+		final Configuration configuration = new Configuration();
+		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+		final ResourceID resourceId = new ResourceID("foobar");
+		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234);
+		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+		final TimerService<AllocationID> timerService = mock(TimerService.class);
+		final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class), mock(ResourceProfile.class)), timerService);
+		final JobManagerTable jobManagerTable = new JobManagerTable();
+		final JobLeaderService jobLeaderService = new JobLeaderService(resourceId);
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+		final String resourceManagerAddress = "rm";
+		final UUID resourceManagerLeaderId = UUID.randomUUID();
+
+		final String jobManagerAddress = "jm";
+		final UUID jobManagerLeaderId = UUID.randomUUID();
+
+		final LeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService(resourceManagerAddress, resourceManagerLeaderId);
+		final LeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService(jobManagerAddress, jobManagerLeaderId);
+		haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
+		haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService);
+
+		final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
+		final InstanceID registrationId = new InstanceID();
+
+		when(resourceManagerGateway.registerTaskExecutor(
+			eq(resourceManagerLeaderId),
+			any(String.class),
+			eq(resourceId),
+			any(SlotReport.class),
+			any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L)));
+
+		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
+		final int blobPort = 42;
+
+		final AllocationID allocationId1 = new AllocationID();
+		final AllocationID allocationId2 = new AllocationID();
+
+		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
+
+		when(jobMasterGateway.registerTaskManager(
+			any(String.class),
+			eq(resourceId),
+			eq(jobManagerLeaderId),
+			any(Time.class)
+		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
+		when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
+
+		when(jobMasterGateway.offerSlots(any(Iterable.class), eq(jobManagerLeaderId), any(Time.class)))
+			.thenReturn(FlinkCompletableFuture.completed((Iterable<AllocationID>)Collections.singleton(allocationId1)));
+
+		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
+		rpc.registerGateway(jobManagerAddress, jobMasterGateway);
+
+		try {
+			TaskExecutor taskManager = new TaskExecutor(
+				taskManagerConfiguration,
+				taskManagerLocation,
+				rpc,
+				mock(MemoryManager.class),
+				mock(IOManager.class),
+				mock(NetworkEnvironment.class),
+				haServices,
+				mock(MetricRegistry.class),
+				mock(TaskManagerMetricGroup.class),
+				mock(BroadcastVariableManager.class),
+				mock(FileCache.class),
+				taskSlotTable,
+				jobManagerTable,
+				jobLeaderService,
+				testingFatalErrorHandler);
+
+			taskManager.start();
+
+			taskSlotTable.allocateSlot(0, jobId, allocationId1, Time.milliseconds(10000L));
+			taskSlotTable.allocateSlot(1, jobId, allocationId2, Time.milliseconds(10000L));
+
+			// we have to add the job after the TaskExecutor, because otherwise the service has not
+			// been properly started.
+			jobLeaderService.addJob(jobId, jobManagerAddress);
+
+			verify(resourceManagerGateway).notifySlotAvailable(eq(resourceManagerLeaderId), eq(registrationId), eq(new SlotID(resourceId, 1)));
+
+			assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1));
+			assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2));
+			assertTrue(taskSlotTable.isSlotFree(1));
+		} finally {
+			// check if a concurrent error occurred
+			testingFatalErrorHandler.rethrowException();
+
+			rpc.stopService();
+		}
+	}
+
+	private static class TestingFatalErrorHandler implements FatalErrorHandler {
+		private static final Logger LOG = LoggerFactory.getLogger(TestingFatalErrorHandler.class);
+		private final AtomicReference<Throwable> atomicThrowable;
+
+		public TestingFatalErrorHandler() {
+			atomicThrowable = new AtomicReference<>(null);
+		}
+
+		public void rethrowException() throws TestingException {
+			Throwable throwable = atomicThrowable.get();
+
+			if (throwable != null) {
+				throw new TestingException(throwable);
+			}
+		}
+
+		public boolean hasExceptionOccurred() {
+			return atomicThrowable.get() != null;
+		}
+
+		public Throwable getException() {
+			return atomicThrowable.get();
+		}
+
+		@Override
+		public void onFatalError(Throwable exception) {
+			LOG.error("OnFatalError:", exception);
+			atomicThrowable.compareAndSet(null, exception);
+		}
+
+		//------------------------------------------------------------------
+		// static utility classes
+		//------------------------------------------------------------------
+
+		private static final class TestingException extends Exception {
+			public TestingException(String message) {
+				super(message);
+			}
+
+			public TestingException(String message, Throwable cause) {
+				super(message, cause);
+			}
+
+			public TestingException(Throwable cause) {
+				super(cause);
+			}
+
+			private static final long serialVersionUID = -4648195335470914498L;
+		}
+	}
+
+	/**
 	 * Tests that all allocation requests for slots are ignored if the slot has been reported as
 	 * free by the TaskExecutor but this report hasn't been confirmed by the ResourceManager.
 	 *
 	 * This is essential for the correctness of the state of the ResourceManager.
 	 */
+	@Ignore
 	@Test
-	public void testRejectAllocationRequestsForOutOfSyncSlots() {
+	public void testRejectAllocationRequestsForOutOfSyncSlots() throws SlotAllocationException {
 		final ResourceID resourceID = ResourceID.generate();
 
 		final String address1 = "/resource/manager/address/one";
 		final UUID leaderId = UUID.randomUUID();
+		final JobID jobId = new JobID();
+		final String jobManagerAddress = "foobar";
 
 		final TestingSerialRpcService rpc = new TestingSerialRpcService();
 		try {
@@ -215,6 +634,8 @@ public class TaskExecutorTest extends TestLogger {
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
 				mock(TaskSlotTable.class),
+				mock(JobManagerTable.class),
+				mock(JobLeaderService.class),
 				mock(FatalErrorHandler.class));
 
 			taskManager.start();
@@ -232,14 +653,14 @@ public class TaskExecutorTest extends TestLogger {
 
 			// test that allocating a slot works
 			final SlotID slotID = new SlotID(resourceID, 0);
-			TMSlotRequestReply tmSlotRequestReply = taskManager.requestSlot(slotID, new AllocationID(), leaderId);
+			TMSlotRequestReply tmSlotRequestReply = taskManager.requestSlot(slotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
 			assertTrue(tmSlotRequestReply instanceof TMSlotRequestRegistered);
 
+			// TODO: Figure out the concrete allocation behaviour between RM and TM. Maybe we don't need the SlotID...
 			// test that we can't allocate slots which are blacklisted due to pending confirmation of the RM
 			final SlotID unconfirmedFreeSlotID = new SlotID(resourceID, 1);
-			taskManager.addUnconfirmedFreeSlotNotification(unconfirmedFreeSlotID);
 			TMSlotRequestReply tmSlotRequestReply2 =
-				taskManager.requestSlot(unconfirmedFreeSlotID, new AllocationID(), leaderId);
+				taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
 			assertTrue(tmSlotRequestReply2 instanceof TMSlotRequestRejected);
 
 			// re-register
@@ -250,7 +671,7 @@ public class TaskExecutorTest extends TestLogger {
 			// now we should be successful because the slots status has been synced
 			// test that we can't allocate slots which are blacklisted due to pending confirmation of the RM
 			TMSlotRequestReply tmSlotRequestReply3 =
-				taskManager.requestSlot(unconfirmedFreeSlotID, new AllocationID(), leaderId);
+				taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
 			assertTrue(tmSlotRequestReply3 instanceof TMSlotRequestRegistered);
 
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 8fa7463..2a9ff61 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -160,7 +161,7 @@ public class TaskAsyncCallTest {
 				.thenReturn(mock(TaskKvStateRegistry.class));
 
 		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-				new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(),
+				new JobID(), new AllocationID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(),
 				new SerializedValue<>(new ExecutionConfig()),
 				"Test Task", 1, 0, 1, 0,
 				new Configuration(), new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index d4efd24..d1909fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -163,12 +164,13 @@ public class TaskManagerTest extends TestLogger {
 				final ExecutionAttemptID eid = new ExecutionAttemptID();
 				final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig());
 
-				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,
-						"TestTask", 7, 2, 7, 0, new Configuration(), new Configuration(),
-						TestInvokableCorrect.class.getName(),
-						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-						Collections.<InputGateDeploymentDescriptor>emptyList(),
-						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
+				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, new AllocationID(),
+					"TestJob", vid, eid, executionConfig,
+					"TestTask", 7, 2, 7, 0, new Configuration(), new Configuration(),
+					TestInvokableCorrect.class.getName(),
+					Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+					Collections.<InputGateDeploymentDescriptor>emptyList(),
+					new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 
 				new Within(d) {
@@ -265,7 +267,7 @@ public class TaskManagerTest extends TestLogger {
 				final ExecutionAttemptID eid2 = new ExecutionAttemptID();
 
 				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
-						jid1, "TestJob1", vid1, eid1,
+						jid1, new AllocationID(), "TestJob1", vid1, eid1,
 						new SerializedValue<>(new ExecutionConfig()),
 						"TestTask1", 5, 1, 5, 0,
 						new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
@@ -274,7 +276,7 @@ public class TaskManagerTest extends TestLogger {
 						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
-						jid2, "TestJob2", vid2, eid2,
+						jid2, new AllocationID(), "TestJob2", vid2, eid2,
 						new SerializedValue<>(new ExecutionConfig()),
 						"TestTask2", 7, 2, 7, 0,
 						new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
@@ -404,13 +406,13 @@ public class TaskManagerTest extends TestLogger {
 
 				final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig());
 
-				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, "TestJob", vid1, eid1, executionConfig,
+				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, new AllocationID(), "TestJob", vid1, eid1, executionConfig,
 						"TestTask1", 5, 1, 5, 0, new Configuration(), new Configuration(), StoppableInvokable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
 						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
-				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, "TestJob", vid2, eid2, executionConfig,
+				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, new AllocationID(), "TestJob", vid2, eid2, executionConfig,
 						"TestTask2", 7, 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -531,7 +533,7 @@ public class TaskManagerTest extends TestLogger {
 				final ExecutionAttemptID eid2 = new ExecutionAttemptID();
 
 				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
-						jid, "TestJob", vid1, eid1,
+						jid, new AllocationID(), "TestJob", vid1, eid1,
 						new SerializedValue<>(new ExecutionConfig()),
 						"Sender", 1, 0, 1, 0,
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
@@ -540,7 +542,7 @@ public class TaskManagerTest extends TestLogger {
 						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
-						jid, "TestJob", vid2, eid2,
+						jid, new AllocationID(), "TestJob", vid2, eid2,
 						new SerializedValue<>(new ExecutionConfig()),
 						"Receiver", 7, 2, 7, 0,
 						new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
@@ -636,7 +638,7 @@ public class TaskManagerTest extends TestLogger {
 						);
 
 				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
-						jid, "TestJob", vid1, eid1,
+						jid, new AllocationID(), "TestJob", vid1, eid1,
 						new SerializedValue<>(new ExecutionConfig()),
 						"Sender", 1, 0, 1, 0,
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
@@ -644,7 +646,7 @@ public class TaskManagerTest extends TestLogger {
 						Collections.<URL>emptyList(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
-						jid, "TestJob", vid2, eid2,
+						jid, new AllocationID(), "TestJob", vid2, eid2,
 						new SerializedValue<>(new ExecutionConfig()),
 						"Receiver", 7, 2, 7, 0,
 						new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
@@ -781,7 +783,7 @@ public class TaskManagerTest extends TestLogger {
 						);
 
 				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
-						jid, "TestJob", vid1, eid1,
+						jid, new AllocationID(), "TestJob", vid1, eid1,
 						new SerializedValue<>(new ExecutionConfig()),
 						"Sender", 1, 0, 1, 0,
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
@@ -789,7 +791,7 @@ public class TaskManagerTest extends TestLogger {
 						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
-						jid, "TestJob", vid2, eid2,
+						jid, new AllocationID(), "TestJob", vid2, eid2,
 						new SerializedValue<>(new ExecutionConfig()),
 						"Receiver", 7, 2, 7, 0,
 						new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(),
@@ -929,7 +931,7 @@ public class TaskManagerTest extends TestLogger {
 						new InputGateDeploymentDescriptor(resultId, 0, icdd);
 
 				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-						jid, "TestJob", vid, eid,
+						jid, new AllocationID(), "TestJob", vid, eid,
 						new SerializedValue<>(new ExecutionConfig()),
 						"Receiver", 1, 0, 1, 0,
 						new Configuration(), new Configuration(),
@@ -1022,7 +1024,7 @@ public class TaskManagerTest extends TestLogger {
 						new InputGateDeploymentDescriptor(resultId, 0, icdd);
 
 				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-						jid, "TestJob", vid, eid,
+						jid, new AllocationID(), "TestJob", vid, eid,
 						new SerializedValue<>(new ExecutionConfig()),
 						"Receiver", 1, 0, 1, 0,
 						new Configuration(), new Configuration(),
@@ -1097,24 +1099,25 @@ public class TaskManagerTest extends TestLogger {
 
 				// Single blocking task
 				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-						new JobID(),
-						"Job",
-						new JobVertexID(),
-						new ExecutionAttemptID(),
-						new SerializedValue<>(new ExecutionConfig()),
-						"Task",
-						1,
-						0,
-						1,
-						0,
-						new Configuration(),
-						new Configuration(),
-						Tasks.BlockingNoOpInvokable.class.getName(),
-						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-						Collections.<InputGateDeploymentDescriptor>emptyList(),
-						Collections.<BlobKey>emptyList(),
-						Collections.<URL>emptyList(),
-						0);
+					new JobID(),
+					new AllocationID(),
+					"Job",
+					new JobVertexID(),
+					new ExecutionAttemptID(),
+					new SerializedValue<>(new ExecutionConfig()),
+					"Task",
+					1,
+					0,
+					1,
+					0,
+					new Configuration(),
+					new Configuration(),
+					Tasks.BlockingNoOpInvokable.class.getName(),
+					Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+					Collections.<InputGateDeploymentDescriptor>emptyList(),
+					Collections.<BlobKey>emptyList(),
+					Collections.<URL>emptyList(),
+					0);
 
 				// Submit the task
 				new Within(d) {

http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index 5d3eb3a..7d466f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -69,7 +69,7 @@ public class TaskStopTest {
 		when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
 		when(tddMock.getSerializedExecutionConfig()).thenReturn(mock(SerializedValue.class));
 		when(tddMock.getInvokableClassName()).thenReturn("className");
-		when(tddMock.getAllocationID()).thenReturn(mock(AllocationID.class));
+		when(tddMock.getAllocationId()).thenReturn(mock(AllocationID.class));
 
 		task = new Task(
 			tddMock,

http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 50fc181..c5a9f2d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -665,16 +666,18 @@ public class TaskTest {
 		}
 		
 		return new TaskDeploymentDescriptor(
-				new JobID(), "Test Job", new JobVertexID(), new ExecutionAttemptID(),
-				execConfig,
-				"Test Task", 1, 0, 1, 0,
-				new Configuration(), new Configuration(),
-				invokable.getName(),
-				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-				Collections.<InputGateDeploymentDescriptor>emptyList(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
-				0);
+			new JobID(),
+			new AllocationID(),
+			"Test Job", new JobVertexID(), new ExecutionAttemptID(),
+			execConfig,
+			"Test Task", 1, 0, 1, 0,
+			new Configuration(), new Configuration(),
+			invokable.getName(),
+			Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+			Collections.<InputGateDeploymentDescriptor>emptyList(),
+			Collections.<BlobKey>emptyList(),
+			Collections.<URL>emptyList(),
+			0);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 1077052..f095cf4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -139,21 +140,22 @@ public class InterruptSensitiveRestoreTest {
 				keyGroupStateFromStream);
 
 		return new TaskDeploymentDescriptor(
-				new JobID(),
-				"test job name",
-				new JobVertexID(),
-				new ExecutionAttemptID(),
-				new SerializedValue<>(new ExecutionConfig()),
-				"test task name",
-				1, 0, 1, 0,
-				new Configuration(),
-				taskConfig,
-				SourceStreamTask.class.getName(),
-				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-				Collections.<InputGateDeploymentDescriptor>emptyList(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
-				0,
+			new JobID(),
+			new AllocationID(),
+			"test job name",
+			new JobVertexID(),
+			new ExecutionAttemptID(),
+			new SerializedValue<>(new ExecutionConfig()),
+			"test task name",
+			1, 0, 1, 0,
+			new Configuration(),
+			taskConfig,
+			SourceStreamTask.class.getName(),
+			Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+			Collections.<InputGateDeploymentDescriptor>emptyList(),
+			Collections.<BlobKey>emptyList(),
+			Collections.<URL>emptyList(),
+			0,
 				taskStateHandles);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index bb246f9..52332a2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -220,17 +221,19 @@ public class StreamTaskTest {
 				.thenReturn(mock(TaskKvStateRegistry.class));
 
 		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-				new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(),
-				new SerializedValue<>(new ExecutionConfig()),
-				"Test Task", 1, 0, 1, 0,
-				new Configuration(),
-				taskConfig.getConfiguration(),
-				invokable.getName(),
-				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-				Collections.<InputGateDeploymentDescriptor>emptyList(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
-				0);
+			new JobID(),
+			new AllocationID(),
+			"Job Name", new JobVertexID(), new ExecutionAttemptID(),
+			new SerializedValue<>(new ExecutionConfig()),
+			"Test Task", 1, 0, 1, 0,
+			new Configuration(),
+			taskConfig.getConfiguration(),
+			invokable.getName(),
+			Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+			Collections.<InputGateDeploymentDescriptor>emptyList(),
+			Collections.<BlobKey>emptyList(),
+			Collections.<URL>emptyList(),
+			0);
 
 		return new Task(
 			tdd,