You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:25 UTC

[06/52] [abbrv] flink git commit: [FLINK-4839] [cluster management] JobManager handle TaskManager's slot offering

[FLINK-4839] [cluster management] JobManager handle TaskManager's slot offering

This closes #2647 #2643.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af924b48
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af924b48
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af924b48

Branch: refs/heads/master
Commit: af924b489420b8d7163e6216c0efb05e3ab30514
Parents: a7ed9a5
Author: Kurt Young <yk...@gmail.com>
Authored: Mon Oct 17 18:15:26 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:23 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/instance/SlotPool.java | 46 +++++++++---
 .../flink/runtime/jobmaster/JobMaster.java      | 50 ++++++++++++-
 .../runtime/jobmaster/JobMasterGateway.java     | 26 +++++--
 .../resourcemanager/ResourceManager.java        |  2 +-
 .../runtime/taskexecutor/TaskExecutor.java      | 34 +++++----
 .../runtime/taskexecutor/slot/SlotOffer.java    | 79 ++++++++++++++++++++
 .../runtime/taskexecutor/slot/TaskSlot.java     | 13 ++++
 .../taskexecutor/slot/TaskSlotTable.java        | 12 +--
 .../runtime/taskexecutor/TaskExecutorTest.java  | 16 +++-
 9 files changed, 231 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 7e7b21e..44df29b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -267,7 +267,7 @@ public class SlotPool implements SlotOwner {
 	}
 
 	// ------------------------------------------------------------------------
-	//  Slot De-allocation
+	//  Slot releasing & offering
 	// ------------------------------------------------------------------------
 
 	/**
@@ -323,10 +323,6 @@ public class SlotPool implements SlotOwner {
 		return null;
 	}
 
-	// ------------------------------------------------------------------------
-	//  Slot Releasing
-	// ------------------------------------------------------------------------
-
 	/**
 	 * Release slot to TaskManager, called for finished tasks or canceled jobs.
 	 *
@@ -340,10 +336,6 @@ public class SlotPool implements SlotOwner {
 		}
 	}
 
-	// ------------------------------------------------------------------------
-	//  Slot Offering
-	// ------------------------------------------------------------------------
-
 	/**
 	 * Slot offering by TaskManager with AllocationID. The AllocationID is originally generated by this pool and
 	 * transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation
@@ -401,6 +393,39 @@ public class SlotPool implements SlotOwner {
 	}
 
 	// ------------------------------------------------------------------------
+	//  Error Handling
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Fail the specified allocation and release the corresponding slot if we have one.
+	 * This may triggered by JobManager when some slot allocation failed with timeout.
+	 * Or this could be triggered by TaskManager, when it finds out something went wrong with the slot,
+	 * and decided to take it back.
+	 *
+	 * @param allocationID Represents the allocation which should be failed
+	 * @param cause        The cause of the failure
+	 */
+	public void failAllocation(final AllocationID allocationID, final Exception cause) {
+		synchronized (lock) {
+			// 1. check whether the allocation still pending
+			Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>> pendingRequest =
+					pendingRequests.get(allocationID);
+			if (pendingRequest != null) {
+				pendingRequest.f1.completeExceptionally(cause);
+				return;
+			}
+
+			// 2. check whether we have a free slot corresponding to this allocation id
+			// TODO: add allocation id to slot descriptor, so we can remove it by allocation id
+
+			// 3. check whether we have a in-use slot corresponding to this allocation id
+			// TODO: needs mechanism to release the in-use Slot but don't return it back to this pool
+
+			// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
+		}
+	}
+
+	// ------------------------------------------------------------------------
 	//  Resource
 	// ------------------------------------------------------------------------
 
@@ -464,12 +489,13 @@ public class SlotPool implements SlotOwner {
 	 */
 	static class AllocatedSlots {
 
-		/** All allocated slots organized by TaskManager */
+		/** All allocated slots organized by TaskManager's id */
 		private final Map<ResourceID, Set<Slot>> allocatedSlotsByResource;
 
 		/** All allocated slots organized by Slot object */
 		private final Map<Slot, AllocationID> allocatedSlots;
 
+		/** All allocated slot descriptors organized by Slot object */
 		private final Map<Slot, SlotDescriptor> allocatedSlotsWithDescriptor;
 
 		/** All allocated slots organized by AllocationID */

http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 7bcfb3a..3c6bbd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -53,6 +53,7 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.SlotDescriptor;
 import org.apache.flink.runtime.instance.SlotPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -85,6 +86,7 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.SerializedThrowable;
@@ -95,7 +97,9 @@ import org.slf4j.Logger;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
@@ -663,13 +667,51 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	@RpcMethod
-	public Iterable<AllocationID> offerSlots(final Iterable<AllocationID> slots, UUID leaderId) {
-		throw new UnsupportedOperationException("Has to be implemented.");
+	public Iterable<SlotOffer> offerSlots(final ResourceID taskManagerId,
+			final Iterable<SlotOffer> slots, final UUID leaderId) throws Exception
+	{
+		if (!this.leaderSessionID.equals(leaderId)) {
+			throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+					+ ", actual: " + leaderId);
+		}
+
+		Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId);
+		if (taskManager == null) {
+			throw new Exception("Unknown TaskManager " + taskManagerId);
+		}
+
+		final Set<SlotOffer> acceptedSlotOffers = new HashSet<>(4);
+		for (SlotOffer slotOffer : slots) {
+			final SlotDescriptor slotDescriptor = new SlotDescriptor(
+					jobGraph.getJobID(),
+					taskManager.f0,
+					slotOffer.getSlotIndex(),
+					slotOffer.getResourceProfile(),
+					null); // TODO: replace the actor gateway with the new rpc gateway, it's ready (taskManager.f1)
+			if (slotPool.offerSlot(slotOffer.getAllocationId(), slotDescriptor)) {
+				acceptedSlotOffers.add(slotOffer);
+			}
+		}
+
+		return acceptedSlotOffers;
 	}
 
 	@RpcMethod
-	public void failSlot(final AllocationID allocationId, UUID leaderId, Exception cause) {
-		throw new UnsupportedOperationException("Has to be implemented.");
+	public void failSlot(final ResourceID taskManagerId,
+			final AllocationID allocationId,
+			final UUID leaderId,
+			final Exception cause) throws Exception
+	{
+		if (!this.leaderSessionID.equals(leaderId)) {
+			throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+					+ ", actual: " + leaderId);
+		}
+
+		if (!registeredTaskManagers.containsKey(taskManagerId)) {
+			throw new Exception("Unknown TaskManager " + taskManagerId);
+		}
+
+		slotPool.failAllocation(allocationId, cause);
 	}
 
 	@RpcMethod

http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 8925d94..2d7ebb9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
@@ -166,21 +167,30 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	/**
 	 * Offer the given slots to the job manager. The response contains the set of accepted slots.
 	 *
-	 * @param slots to offer to the job manager
-	 * @param leaderId identifying the job leader
-	 * @param timeout for the rpc call
+	 * @param taskManagerId identifying the task manager
+	 * @param slots         to offer to the job manager
+	 * @param leaderId      identifying the job leader
+	 * @param timeout       for the rpc call
 	 * @return Future set of accepted slots.
 	 */
-	Future<Iterable<AllocationID>> offerSlots(final Iterable<AllocationID> slots, UUID leaderId, @RpcTimeout final Time timeout);
+	Future<Iterable<SlotOffer>> offerSlots(
+			final ResourceID taskManagerId,
+			final Iterable<SlotOffer> slots,
+			final UUID leaderId,
+			@RpcTimeout final Time timeout);
 
 	/**
 	 * Fail the slot with the given allocation id and cause.
 	 *
-	 * @param allocationId identifying the slot to fail
-	 * @param leaderId identifying the job leader
-	 * @param cause of the failing
+	 * @param taskManagerId identifying the task manager
+	 * @param allocationId  identifying the slot to fail
+	 * @param leaderId      identifying the job leader
+	 * @param cause         of the failing
 	 */
-	void failSlot(final AllocationID allocationId, UUID leaderId, Exception cause);
+	void failSlot(final ResourceID taskManagerId,
+			final AllocationID allocationId,
+			final UUID leaderId,
+			final Exception cause);
 
 	/**
 	 * Register the task manager at the job manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3122804..f1a5073 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -69,7 +69,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * ResourceManager implementation. The resource manager is responsible for resource de-/allocation
  * and bookkeeping.
  *
- * It offers the following methods as part of its rpc interface to interact with the him remotely:
+ * It offers the following methods as part of its rpc interface to interact with him remotely:
  * <ul>
  *     <li>{@link #registerJobMaster(UUID, UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li>
  *     <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li>

http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 5146e5b..679324b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -72,6 +72,8 @@ import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNot
 import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
 import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException;
 import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlot;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -660,47 +662,49 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 				final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();
 
-				final Iterator<AllocationID> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
+				final Iterator<TaskSlot> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
 				final UUID leaderId = jobManagerConnection.getLeaderId();
 
-				final Collection<AllocationID> reservedSlots = new HashSet<>(2);
+				final Collection<SlotOffer> reservedSlots = new HashSet<>(2);
 
 				while (reservedSlotsIterator.hasNext()) {
-					reservedSlots.add(reservedSlotsIterator.next());
+					reservedSlots.add(reservedSlotsIterator.next().generateSlotOffer());
 				}
 
-				Future<Iterable<AllocationID>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
+				Future<Iterable<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
+					getResourceID(),
 					reservedSlots,
 					leaderId,
 					taskManagerConfiguration.getTimeout());
 
-				acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction<Iterable<AllocationID>>() {
+				acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction<Iterable<SlotOffer>>() {
 					@Override
-					public void accept(Iterable<AllocationID> acceptedSlots) {
+					public void accept(Iterable<SlotOffer> acceptedSlots) {
 						// check if the response is still valid
 						if (isJobManagerConnectionValid(jobId, leaderId)) {
 							// mark accepted slots active
-							for (AllocationID acceptedSlot: acceptedSlots) {
+							for (SlotOffer acceptedSlot: acceptedSlots) {
 								try {
-									if (!taskSlotTable.markSlotActive(acceptedSlot)) {
+									if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) {
 										// the slot is either free or releasing at the moment
 										final String message = "Could not mark slot " + jobId + " active.";
 										log.debug(message);
-										jobMasterGateway.failSlot(acceptedSlot, leaderId, new Exception(message));
+										jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(),
+												leaderId, new Exception(message));
 									}
 
 									// remove the assigned slots so that we can free the left overs
 									reservedSlots.remove(acceptedSlot);
 								} catch (SlotNotFoundException e) {
 									log.debug("Could not mark slot {} active.", acceptedSlot,  e);
-									jobMasterGateway.failSlot(acceptedSlot, leaderId, e);
+									jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(), leaderId, e);
 								}
 							}
 
 							final Exception e = new Exception("The slot was rejected by the JobManager.");
 
-							for (AllocationID rejectedSlot: reservedSlots) {
-								freeSlot(rejectedSlot, e);
+							for (SlotOffer rejectedSlot: reservedSlots) {
+								freeSlot(rejectedSlot.getAllocationId(), e);
 							}
 						} else {
 							// discard the response since there is a new leader for the job
@@ -718,8 +722,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 							offerSlotsToJobManager(jobId);
 						} else {
 							// We encountered an exception. Free the slots and return them to the RM.
-							for (AllocationID reservedSlot: reservedSlots) {
-								freeSlot(reservedSlot, throwable);
+							for (SlotOffer reservedSlot: reservedSlots) {
+								freeSlot(reservedSlot.getAllocationId(), throwable);
 							}
 						}
 
@@ -870,7 +874,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 	private void unregisterTaskAndNotifyFinalState(
 			final UUID jobMasterLeaderId,
-			final JobMasterGateway jobMasterGateway,		
+			final JobMasterGateway jobMasterGateway,
 			final ExecutionAttemptID executionAttemptID) {
 
 		Task task = taskSlotTable.removeTask(executionAttemptID);

http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java
new file mode 100644
index 0000000..f8d7e6c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Describe the slot offering to job manager provided by task manager.
+ */
+public class SlotOffer implements Serializable {
+
+	private static final long serialVersionUID = -7067814231108250971L;
+
+	/** Allocation id of this slot, this would be the only identifier for this slot offer */
+	private AllocationID allocationId;
+
+	/** Index of the offered slot */
+	private final int slotIndex;
+
+	/** The resource profile of the offered slot */
+	private final ResourceProfile resourceProfile;
+
+	public SlotOffer(final AllocationID allocationID, final int index, final ResourceProfile resourceProfile) {
+		Preconditions.checkArgument(0 <= index, "The index must be greater than 0.");
+		this.allocationId = Preconditions.checkNotNull(allocationID);
+		this.slotIndex = index;
+		this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
+	}
+
+	public AllocationID getAllocationId() {
+		return allocationId;
+	}
+
+	public int getSlotIndex() {
+		return slotIndex;
+	}
+
+	public ResourceProfile getResourceProfile() {
+		return resourceProfile;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		SlotOffer slotOffer = (SlotOffer) o;
+		return allocationId.equals(slotOffer.allocationId);
+	}
+
+	@Override
+	public int hashCode() {
+		return allocationId.hashCode();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
index 0942772..e12c15b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -286,4 +286,17 @@ public class TaskSlot {
 		state = TaskSlotState.RELEASING;
 		return true;
 	}
+
+	/**
+	 * Generate the slot offer from this TaskSlot.
+	 *
+	 * @return The sot offer which this task slot can provide
+	 */
+	public SlotOffer generateSlotOffer() {
+		Preconditions.checkState(TaskSlotState.ACTIVE == state || TaskSlotState.ALLOCATED == state,
+				"The task slot is not in state active or allocated.");
+		Preconditions.checkState(allocationId != null, "The task slot are not allocated");
+
+		return new SlotOffer(allocationId, index, resourceProfile);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 88123b4..88b83a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -70,7 +70,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 
 	/** Interface for slot actions, such as freeing them or timing them out */
 	private SlotActions slotActions;
-	
+
 	/** Whether the table has been started */
 	private boolean started;
 
@@ -250,7 +250,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	 */
 	public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
 		checkInit();
-		
+
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Free slot {}.", allocationId, cause);
 		} else {
@@ -370,13 +370,13 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	}
 
 	/**
-	 * Return an iterator of allocated slots (their allocation ids) for the given job id.
+	 * Return an iterator of allocated slots for the given job id.
 	 *
 	 * @param jobId for which to return the allocated slots
-	 * @return Iterator of allocation ids of allocated slots.
+	 * @return Iterator of allocated slots.
 	 */
-	public Iterator<AllocationID> getAllocatedSlots(JobID jobId) {
-		return new AllocationIDIterator(jobId, TaskSlotState.ALLOCATED);
+	public Iterator<TaskSlot> getAllocatedSlots(JobID jobId) {
+		return new TaskSlotIterator(jobId, TaskSlotState.ALLOCATED);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 55cc142..4d73a4b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -65,6 +65,7 @@ import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequ
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -406,6 +407,7 @@ public class TaskExecutorTest extends TestLogger {
 
 		final AllocationID allocationId = new AllocationID();
 		final SlotID slotId = new SlotID(resourceId, 0);
+		final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
 
 		try {
 			TaskExecutor taskManager = new TaskExecutor(
@@ -440,7 +442,11 @@ public class TaskExecutorTest extends TestLogger {
 			jobManagerLeaderRetrievalService.notifyListener(jobManagerAddress, jobManagerLeaderId);
 
 			// the job leader should get the allocation id offered
-			verify(jobMasterGateway).offerSlots((Iterable<AllocationID>)Matchers.argThat(contains(allocationId)), eq(jobManagerLeaderId), any(Time.class));
+			verify(jobMasterGateway).offerSlots(
+					any(ResourceID.class),
+					(Iterable<SlotOffer>)Matchers.argThat(contains(slotOffer)),
+					eq(jobManagerLeaderId),
+					any(Time.class));
 		} finally {
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowException();
@@ -496,6 +502,9 @@ public class TaskExecutorTest extends TestLogger {
 		final AllocationID allocationId1 = new AllocationID();
 		final AllocationID allocationId2 = new AllocationID();
 
+		final SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
+		final SlotOffer offer2 = new SlotOffer(allocationId2, 0, ResourceProfile.UNKNOWN);
+
 		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
 
 		when(jobMasterGateway.registerTaskManager(
@@ -506,8 +515,9 @@ public class TaskExecutorTest extends TestLogger {
 		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
 		when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
 
-		when(jobMasterGateway.offerSlots(any(Iterable.class), eq(jobManagerLeaderId), any(Time.class)))
-			.thenReturn(FlinkCompletableFuture.completed((Iterable<AllocationID>)Collections.singleton(allocationId1)));
+		when(jobMasterGateway.offerSlots(
+				any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId), any(Time.class)))
+			.thenReturn(FlinkCompletableFuture.completed((Iterable<SlotOffer>)Collections.singleton(offer1)));
 
 		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
 		rpc.registerGateway(jobManagerAddress, jobMasterGateway);