You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:25 UTC
[06/52] [abbrv] flink git commit: [FLINK-4839] [cluster management]
JobManager handle TaskManager's slot offering
[FLINK-4839] [cluster management] JobManager handle TaskManager's slot offering
This closes #2647 #2643.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af924b48
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af924b48
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af924b48
Branch: refs/heads/master
Commit: af924b489420b8d7163e6216c0efb05e3ab30514
Parents: a7ed9a5
Author: Kurt Young <yk...@gmail.com>
Authored: Mon Oct 17 18:15:26 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:23 2016 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/instance/SlotPool.java | 46 +++++++++---
.../flink/runtime/jobmaster/JobMaster.java | 50 ++++++++++++-
.../runtime/jobmaster/JobMasterGateway.java | 26 +++++--
.../resourcemanager/ResourceManager.java | 2 +-
.../runtime/taskexecutor/TaskExecutor.java | 34 +++++----
.../runtime/taskexecutor/slot/SlotOffer.java | 79 ++++++++++++++++++++
.../runtime/taskexecutor/slot/TaskSlot.java | 13 ++++
.../taskexecutor/slot/TaskSlotTable.java | 12 +--
.../runtime/taskexecutor/TaskExecutorTest.java | 16 +++-
9 files changed, 231 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 7e7b21e..44df29b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -267,7 +267,7 @@ public class SlotPool implements SlotOwner {
}
// ------------------------------------------------------------------------
- // Slot De-allocation
+ // Slot releasing & offering
// ------------------------------------------------------------------------
/**
@@ -323,10 +323,6 @@ public class SlotPool implements SlotOwner {
return null;
}
- // ------------------------------------------------------------------------
- // Slot Releasing
- // ------------------------------------------------------------------------
-
/**
* Release slot to TaskManager, called for finished tasks or canceled jobs.
*
@@ -340,10 +336,6 @@ public class SlotPool implements SlotOwner {
}
}
- // ------------------------------------------------------------------------
- // Slot Offering
- // ------------------------------------------------------------------------
-
/**
* Slot offering by TaskManager with AllocationID. The AllocationID is originally generated by this pool and
* transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation
@@ -401,6 +393,39 @@ public class SlotPool implements SlotOwner {
}
// ------------------------------------------------------------------------
+ // Error Handling
+ // ------------------------------------------------------------------------
+
+ /**
+ * Fail the specified allocation and release the corresponding slot if we have one.
+ * This may triggered by JobManager when some slot allocation failed with timeout.
+ * Or this could be triggered by TaskManager, when it finds out something went wrong with the slot,
+ * and decided to take it back.
+ *
+ * @param allocationID Represents the allocation which should be failed
+ * @param cause The cause of the failure
+ */
+ public void failAllocation(final AllocationID allocationID, final Exception cause) {
+ synchronized (lock) {
+ // 1. check whether the allocation still pending
+ Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>> pendingRequest =
+ pendingRequests.get(allocationID);
+ if (pendingRequest != null) {
+ pendingRequest.f1.completeExceptionally(cause);
+ return;
+ }
+
+ // 2. check whether we have a free slot corresponding to this allocation id
+ // TODO: add allocation id to slot descriptor, so we can remove it by allocation id
+
+ // 3. check whether we have a in-use slot corresponding to this allocation id
+ // TODO: needs mechanism to release the in-use Slot but don't return it back to this pool
+
+ // TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
+ }
+ }
+
+ // ------------------------------------------------------------------------
// Resource
// ------------------------------------------------------------------------
@@ -464,12 +489,13 @@ public class SlotPool implements SlotOwner {
*/
static class AllocatedSlots {
- /** All allocated slots organized by TaskManager */
+ /** All allocated slots organized by TaskManager's id */
private final Map<ResourceID, Set<Slot>> allocatedSlotsByResource;
/** All allocated slots organized by Slot object */
private final Map<Slot, AllocationID> allocatedSlots;
+ /** All allocated slot descriptors organized by Slot object */
private final Map<Slot, SlotDescriptor> allocatedSlotsWithDescriptor;
/** All allocated slots organized by AllocationID */
http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 7bcfb3a..3c6bbd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -53,6 +53,7 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.SlotDescriptor;
import org.apache.flink.runtime.instance.SlotPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -85,6 +86,7 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.SerializedThrowable;
@@ -95,7 +97,9 @@ import org.slf4j.Logger;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
@@ -663,13 +667,51 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
@RpcMethod
- public Iterable<AllocationID> offerSlots(final Iterable<AllocationID> slots, UUID leaderId) {
- throw new UnsupportedOperationException("Has to be implemented.");
+ public Iterable<SlotOffer> offerSlots(final ResourceID taskManagerId,
+ final Iterable<SlotOffer> slots, final UUID leaderId) throws Exception
+ {
+ if (!this.leaderSessionID.equals(leaderId)) {
+ throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+ + ", actual: " + leaderId);
+ }
+
+ Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId);
+ if (taskManager == null) {
+ throw new Exception("Unknown TaskManager " + taskManagerId);
+ }
+
+ final Set<SlotOffer> acceptedSlotOffers = new HashSet<>(4);
+ for (SlotOffer slotOffer : slots) {
+ final SlotDescriptor slotDescriptor = new SlotDescriptor(
+ jobGraph.getJobID(),
+ taskManager.f0,
+ slotOffer.getSlotIndex(),
+ slotOffer.getResourceProfile(),
+ null); // TODO: replace the actor gateway with the new rpc gateway, it's ready (taskManager.f1)
+ if (slotPool.offerSlot(slotOffer.getAllocationId(), slotDescriptor)) {
+ acceptedSlotOffers.add(slotOffer);
+ }
+ }
+
+ return acceptedSlotOffers;
}
@RpcMethod
- public void failSlot(final AllocationID allocationId, UUID leaderId, Exception cause) {
- throw new UnsupportedOperationException("Has to be implemented.");
+ public void failSlot(final ResourceID taskManagerId,
+ final AllocationID allocationId,
+ final UUID leaderId,
+ final Exception cause) throws Exception
+ {
+ if (!this.leaderSessionID.equals(leaderId)) {
+ throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+ + ", actual: " + leaderId);
+ }
+
+ if (!registeredTaskManagers.containsKey(taskManagerId)) {
+ throw new Exception("Unknown TaskManager " + taskManagerId);
+ }
+
+ slotPool.failAllocation(allocationId, cause);
}
@RpcMethod
http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 8925d94..2d7ebb9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -166,21 +167,30 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
/**
* Offer the given slots to the job manager. The response contains the set of accepted slots.
*
- * @param slots to offer to the job manager
- * @param leaderId identifying the job leader
- * @param timeout for the rpc call
+ * @param taskManagerId identifying the task manager
+ * @param slots to offer to the job manager
+ * @param leaderId identifying the job leader
+ * @param timeout for the rpc call
* @return Future set of accepted slots.
*/
- Future<Iterable<AllocationID>> offerSlots(final Iterable<AllocationID> slots, UUID leaderId, @RpcTimeout final Time timeout);
+ Future<Iterable<SlotOffer>> offerSlots(
+ final ResourceID taskManagerId,
+ final Iterable<SlotOffer> slots,
+ final UUID leaderId,
+ @RpcTimeout final Time timeout);
/**
* Fail the slot with the given allocation id and cause.
*
- * @param allocationId identifying the slot to fail
- * @param leaderId identifying the job leader
- * @param cause of the failing
+ * @param taskManagerId identifying the task manager
+ * @param allocationId identifying the slot to fail
+ * @param leaderId identifying the job leader
+ * @param cause of the failing
*/
- void failSlot(final AllocationID allocationId, UUID leaderId, Exception cause);
+ void failSlot(final ResourceID taskManagerId,
+ final AllocationID allocationId,
+ final UUID leaderId,
+ final Exception cause);
/**
* Register the task manager at the job manager.
http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3122804..f1a5073 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -69,7 +69,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* ResourceManager implementation. The resource manager is responsible for resource de-/allocation
* and bookkeeping.
*
- * It offers the following methods as part of its rpc interface to interact with the him remotely:
+ * It offers the following methods as part of its rpc interface to interact with him remotely:
* <ul>
* <li>{@link #registerJobMaster(UUID, UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li>
* <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li>
http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 5146e5b..679324b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -72,6 +72,8 @@ import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNot
import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlot;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
@@ -660,47 +662,49 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();
- final Iterator<AllocationID> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
+ final Iterator<TaskSlot> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
final UUID leaderId = jobManagerConnection.getLeaderId();
- final Collection<AllocationID> reservedSlots = new HashSet<>(2);
+ final Collection<SlotOffer> reservedSlots = new HashSet<>(2);
while (reservedSlotsIterator.hasNext()) {
- reservedSlots.add(reservedSlotsIterator.next());
+ reservedSlots.add(reservedSlotsIterator.next().generateSlotOffer());
}
- Future<Iterable<AllocationID>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
+ Future<Iterable<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
+ getResourceID(),
reservedSlots,
leaderId,
taskManagerConfiguration.getTimeout());
- acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction<Iterable<AllocationID>>() {
+ acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction<Iterable<SlotOffer>>() {
@Override
- public void accept(Iterable<AllocationID> acceptedSlots) {
+ public void accept(Iterable<SlotOffer> acceptedSlots) {
// check if the response is still valid
if (isJobManagerConnectionValid(jobId, leaderId)) {
// mark accepted slots active
- for (AllocationID acceptedSlot: acceptedSlots) {
+ for (SlotOffer acceptedSlot: acceptedSlots) {
try {
- if (!taskSlotTable.markSlotActive(acceptedSlot)) {
+ if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) {
// the slot is either free or releasing at the moment
final String message = "Could not mark slot " + jobId + " active.";
log.debug(message);
- jobMasterGateway.failSlot(acceptedSlot, leaderId, new Exception(message));
+ jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(),
+ leaderId, new Exception(message));
}
// remove the assigned slots so that we can free the left overs
reservedSlots.remove(acceptedSlot);
} catch (SlotNotFoundException e) {
log.debug("Could not mark slot {} active.", acceptedSlot, e);
- jobMasterGateway.failSlot(acceptedSlot, leaderId, e);
+ jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(), leaderId, e);
}
}
final Exception e = new Exception("The slot was rejected by the JobManager.");
- for (AllocationID rejectedSlot: reservedSlots) {
- freeSlot(rejectedSlot, e);
+ for (SlotOffer rejectedSlot: reservedSlots) {
+ freeSlot(rejectedSlot.getAllocationId(), e);
}
} else {
// discard the response since there is a new leader for the job
@@ -718,8 +722,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
offerSlotsToJobManager(jobId);
} else {
// We encountered an exception. Free the slots and return them to the RM.
- for (AllocationID reservedSlot: reservedSlots) {
- freeSlot(reservedSlot, throwable);
+ for (SlotOffer reservedSlot: reservedSlots) {
+ freeSlot(reservedSlot.getAllocationId(), throwable);
}
}
@@ -870,7 +874,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
private void unregisterTaskAndNotifyFinalState(
final UUID jobMasterLeaderId,
- final JobMasterGateway jobMasterGateway,
+ final JobMasterGateway jobMasterGateway,
final ExecutionAttemptID executionAttemptID) {
Task task = taskSlotTable.removeTask(executionAttemptID);
http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java
new file mode 100644
index 0000000..f8d7e6c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Describe the slot offering to job manager provided by task manager.
+ */
+public class SlotOffer implements Serializable {
+
+ private static final long serialVersionUID = -7067814231108250971L;
+
+ /** Allocation id of this slot, this would be the only identifier for this slot offer */
+ private AllocationID allocationId;
+
+ /** Index of the offered slot */
+ private final int slotIndex;
+
+ /** The resource profile of the offered slot */
+ private final ResourceProfile resourceProfile;
+
+ public SlotOffer(final AllocationID allocationID, final int index, final ResourceProfile resourceProfile) {
+ Preconditions.checkArgument(0 <= index, "The index must be greater than 0.");
+ this.allocationId = Preconditions.checkNotNull(allocationID);
+ this.slotIndex = index;
+ this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
+ }
+
+ public AllocationID getAllocationId() {
+ return allocationId;
+ }
+
+ public int getSlotIndex() {
+ return slotIndex;
+ }
+
+ public ResourceProfile getResourceProfile() {
+ return resourceProfile;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SlotOffer slotOffer = (SlotOffer) o;
+ return allocationId.equals(slotOffer.allocationId);
+ }
+
+ @Override
+ public int hashCode() {
+ return allocationId.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
index 0942772..e12c15b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -286,4 +286,17 @@ public class TaskSlot {
state = TaskSlotState.RELEASING;
return true;
}
+
+ /**
+ * Generate the slot offer from this TaskSlot.
+ *
+ * @return The sot offer which this task slot can provide
+ */
+ public SlotOffer generateSlotOffer() {
+ Preconditions.checkState(TaskSlotState.ACTIVE == state || TaskSlotState.ALLOCATED == state,
+ "The task slot is not in state active or allocated.");
+ Preconditions.checkState(allocationId != null, "The task slot are not allocated");
+
+ return new SlotOffer(allocationId, index, resourceProfile);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 88123b4..88b83a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -70,7 +70,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
/** Interface for slot actions, such as freeing them or timing them out */
private SlotActions slotActions;
-
+
/** Whether the table has been started */
private boolean started;
@@ -250,7 +250,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
*/
public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
checkInit();
-
+
if (LOG.isDebugEnabled()) {
LOG.debug("Free slot {}.", allocationId, cause);
} else {
@@ -370,13 +370,13 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
}
/**
- * Return an iterator of allocated slots (their allocation ids) for the given job id.
+ * Return an iterator of allocated slots for the given job id.
*
* @param jobId for which to return the allocated slots
- * @return Iterator of allocation ids of allocated slots.
+ * @return Iterator of allocated slots.
*/
- public Iterator<AllocationID> getAllocatedSlots(JobID jobId) {
- return new AllocationIDIterator(jobId, TaskSlotState.ALLOCATED);
+ public Iterator<TaskSlot> getAllocatedSlots(JobID jobId) {
+ return new TaskSlotIterator(jobId, TaskSlotState.ALLOCATED);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 55cc142..4d73a4b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -65,6 +65,7 @@ import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequ
import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -406,6 +407,7 @@ public class TaskExecutorTest extends TestLogger {
final AllocationID allocationId = new AllocationID();
final SlotID slotId = new SlotID(resourceId, 0);
+ final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
try {
TaskExecutor taskManager = new TaskExecutor(
@@ -440,7 +442,11 @@ public class TaskExecutorTest extends TestLogger {
jobManagerLeaderRetrievalService.notifyListener(jobManagerAddress, jobManagerLeaderId);
// the job leader should get the allocation id offered
- verify(jobMasterGateway).offerSlots((Iterable<AllocationID>)Matchers.argThat(contains(allocationId)), eq(jobManagerLeaderId), any(Time.class));
+ verify(jobMasterGateway).offerSlots(
+ any(ResourceID.class),
+ (Iterable<SlotOffer>)Matchers.argThat(contains(slotOffer)),
+ eq(jobManagerLeaderId),
+ any(Time.class));
} finally {
// check if a concurrent error occurred
testingFatalErrorHandler.rethrowException();
@@ -496,6 +502,9 @@ public class TaskExecutorTest extends TestLogger {
final AllocationID allocationId1 = new AllocationID();
final AllocationID allocationId2 = new AllocationID();
+ final SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
+ final SlotOffer offer2 = new SlotOffer(allocationId2, 0, ResourceProfile.UNKNOWN);
+
final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
when(jobMasterGateway.registerTaskManager(
@@ -506,8 +515,9 @@ public class TaskExecutorTest extends TestLogger {
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
- when(jobMasterGateway.offerSlots(any(Iterable.class), eq(jobManagerLeaderId), any(Time.class)))
- .thenReturn(FlinkCompletableFuture.completed((Iterable<AllocationID>)Collections.singleton(allocationId1)));
+ when(jobMasterGateway.offerSlots(
+ any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId), any(Time.class)))
+ .thenReturn(FlinkCompletableFuture.completed((Iterable<SlotOffer>)Collections.singleton(offer1)));
rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
rpc.registerGateway(jobManagerAddress, jobMasterGateway);