You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/21 12:22:17 UTC
[37/50] [abbrv] flink git commit: [FLINK-4489] [tm] Add TaskSlotTable
to manage slot allocations for multiple job managers
[FLINK-4489] [tm] Add TaskSlotTable to manage slot allocations for multiple job managers
Add TimerService for slot timeouts
Add task and task slot access methods
Add comments to newly introduced classes
This closes #2638.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f3adc9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f3adc9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f3adc9f
Branch: refs/heads/flip-6
Commit: 5f3adc9f9d21ff80726f885c751c071a90318aa4
Parents: fe999e0
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Oct 5 11:58:26 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 20 19:49:24 2016 +0200
----------------------------------------------------------------------
.../runtime/clusterframework/types/SlotID.java | 4 +-
.../resourcemanager/ResourceManager.java | 13 +-
.../resourcemanager/ResourceManagerGateway.java | 13 +-
.../runtime/taskexecutor/TaskExecutor.java | 194 +++---
.../runtime/taskexecutor/TaskManagerRunner.java | 1 +
.../taskexecutor/TaskManagerServices.java | 29 +-
.../TaskManagerServicesConfiguration.java | 2 +
.../flink/runtime/taskexecutor/TaskSlot.java | 73 --
.../runtime/taskexecutor/TaskSlotMapping.java | 44 --
.../runtime/taskexecutor/slot/SlotActions.java | 45 ++
.../slot/SlotNotActiveException.java | 34 +
.../slot/SlotNotFoundException.java | 37 +
.../runtime/taskexecutor/slot/TaskSlot.java | 289 ++++++++
.../taskexecutor/slot/TaskSlotState.java | 29 +
.../taskexecutor/slot/TaskSlotTable.java | 682 +++++++++++++++++++
.../taskexecutor/slot/TimeoutListener.java | 37 +
.../runtime/taskexecutor/slot/TimerService.java | 160 +++++
.../apache/flink/runtime/taskmanager/Task.java | 8 +-
.../PartialConsumePipelinedResultTest.java | 1 -
.../runtime/taskexecutor/TaskExecutorTest.java | 4 +
20 files changed, 1457 insertions(+), 242 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
index 237597b..d6409b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework.types;
import java.io.Serializable;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -34,8 +35,9 @@ public class SlotID implements ResourceIDRetrievable, Serializable {
/** The numeric id for single slot */
private final int slotNumber;
-
+
public SlotID(ResourceID resourceId, int slotNumber) {
+ checkArgument(0 <= slotNumber, "Slot number must be positive.");
this.resourceId = checkNotNull(resourceId, "ResourceID must not be null");
this.slotNumber = slotNumber;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 8fbb34b..3122804 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.SlotAvailableReply;
import org.apache.flink.runtime.resourcemanager.registration.JobMasterRegistration;
import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
@@ -337,34 +336,32 @@ public abstract class ResourceManager<WorkerType extends Serializable>
/**
* Notification from a TaskExecutor that a slot has become available
* @param resourceManagerLeaderId TaskExecutor's resource manager leader id
- * @param resourceID TaskExecutor's resource id
* @param instanceID TaskExecutor's instance id
* @param slotID The slot id of the available slot
* @return SlotAvailableReply
*/
@RpcMethod
- public SlotAvailableReply notifySlotAvailable(
+ public void notifySlotAvailable(
final UUID resourceManagerLeaderId,
- final ResourceID resourceID,
final InstanceID instanceID,
final SlotID slotID) {
if (resourceManagerLeaderId.equals(leaderSessionID)) {
- WorkerRegistration<WorkerType> registration = taskExecutors.get(resourceID);
+ final ResourceID resourceId = slotID.getResourceID();
+ WorkerRegistration<WorkerType> registration = taskExecutors.get(resourceId);
+
if (registration != null) {
InstanceID registrationInstanceID = registration.getInstanceID();
if (registrationInstanceID.equals(instanceID)) {
runAsync(new Runnable() {
@Override
public void run() {
- slotManager.notifySlotAvailable(resourceID, slotID);
+ slotManager.notifySlotAvailable(resourceId, slotID);
}
});
- return new SlotAvailableReply(leaderSessionID, slotID);
}
}
}
- return null;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 07e9e43..968eeb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.SlotAvailableReply;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.jobmaster.JobMaster;
@@ -94,17 +93,13 @@ public interface ResourceManagerGateway extends RpcGateway {
* Sent by the TaskExecutor to notify the ResourceManager that a slot has become available.
*
* @param resourceManagerLeaderId The ResourceManager leader id
- * @param resourceID The ResourceID of the TaskExecutor
- * @param instanceID The InstanceID of the TaskExecutor
+ * @param instanceId TaskExecutor's instance id
* @param slotID The SlotID of the freed slot
- * @return The confirmation by the ResourceManager
*/
- Future<SlotAvailableReply> notifySlotAvailable(
+ void notifySlotAvailable(
UUID resourceManagerLeaderId,
- ResourceID resourceID,
- InstanceID instanceID,
- SlotID slotID,
- @RpcTimeout Time timeout);
+ InstanceID instanceId,
+ SlotID slotID);
/**
* Registers an infoMessage listener
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 9f9234f..e642315 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
@@ -64,6 +65,10 @@ import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
import org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider;
import org.apache.flink.runtime.taskexecutor.rpc.RpcPartitionStateChecker;
import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
+import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException;
+import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -76,7 +81,6 @@ import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -132,13 +136,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
private Map<ResourceID, JobManagerConnection> jobManagerConnections;
- // --------- Slot allocation table --------
+ // --------- task slot allocation table -----------
- private Map<AllocationID, TaskSlot> taskSlots;
-
- // --------- Slot allocation table --------
-
- private Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings;
+ private final TaskSlotTable taskSlotTable;
// ------------------------------------------------------------------------
@@ -154,6 +154,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
TaskManagerMetricGroup taskManagerMetricGroup,
BroadcastVariableManager broadcastVariableManager,
FileCache fileCache,
+ TaskSlotTable taskSlotTable,
FatalErrorHandler fatalErrorHandler) {
super(rpcService);
@@ -167,6 +168,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
this.networkEnvironment = checkNotNull(networkEnvironment);
this.haServices = checkNotNull(haServices);
this.metricRegistry = checkNotNull(metricRegistry);
+ this.taskSlotTable = checkNotNull(taskSlotTable);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
this.broadcastVariableManager = checkNotNull(broadcastVariableManager);
@@ -175,8 +177,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
this.jobManagerConnections = new HashMap<>(4);
this.unconfirmedFreeSlots = new HashSet<>();
- this.taskSlots = new HashMap<>(taskManagerConfiguration.getNumberSlots());
- this.taskSlotMappings = new HashMap<>(taskManagerConfiguration.getNumberSlots() * 2);
}
// ------------------------------------------------------------------------
@@ -193,6 +193,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
} catch (Exception e) {
onFatalErrorAsync(e);
}
+
+ // tell the task slot table who's responsible for the task slot actions
+ taskSlotTable.start(new SlotActionsImpl(), taskManagerConfiguration.getTimeout());
}
/**
@@ -202,6 +205,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
public void shutDown() {
log.info("Stopping TaskManager {}.", getAddress());
+ taskSlotTable.stop();
+
if (resourceManagerConnection.isConnected()) {
try {
resourceManagerConnection.close();
@@ -264,10 +269,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
throw new TaskSubmissionException(message);
}
- TaskSlot taskSlot = taskSlots.get(tdd.getAllocationID());
-
- if (taskSlot == null) {
- final String message = "No task slot allocated for allocation ID " + tdd.getAllocationID() + '.';
+ if (!taskSlotTable.existActiveSlot(tdd.getJobID(), tdd.getAllocationID())) {
+ final String message = "No task slot allocated for job ID " + tdd.getJobID() +
+ " and allocation ID " + tdd.getAllocationID() + '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
@@ -307,10 +311,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());
- if(taskSlot.add(task)) {
- TaskSlotMapping taskSlotMapping = new TaskSlotMapping(task, taskSlot);
+ boolean taskAdded;
- taskSlotMappings.put(task.getExecutionId(), taskSlotMapping);
+ try {
+ taskAdded = taskSlotTable.addTask(task);
+ } catch (SlotNotFoundException | SlotNotActiveException e) {
+ throw new TaskSubmissionException("Could not submit task.", e);
+ }
+
+ if (taskAdded) {
task.startTaskThread();
return Acknowledge.get();
@@ -325,7 +334,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
@RpcMethod
public Acknowledge cancelTask(ExecutionAttemptID executionAttemptID) throws TaskException {
- final Task task = getTask(executionAttemptID);
+ final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
try {
@@ -344,7 +353,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
@RpcMethod
public Acknowledge stopTask(ExecutionAttemptID executionAttemptID) throws TaskException {
- final Task task = getTask(executionAttemptID);
+ final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
try {
@@ -367,7 +376,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
@RpcMethod
public Acknowledge updatePartitions(final ExecutionAttemptID executionAttemptID, Collection<PartitionInfo> partitionInfos) throws PartitionException {
- final Task task = getTask(executionAttemptID);
+ final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
for (final PartitionInfo partitionInfo: partitionInfos) {
@@ -430,7 +439,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) throws CheckpointException {
log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
- final Task task = getTask(executionAttemptID);
+ final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp);
@@ -448,7 +457,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
public Acknowledge confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) throws CheckpointException {
log.debug("Confirm checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
- final Task task = getTask(executionAttemptID);
+ final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
task.notifyCheckpointComplete(checkpointId);
@@ -494,68 +503,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
return jobManagerConnections.get(jobManagerID);
}
- private Task getTask(ExecutionAttemptID executionAttemptID) {
- TaskSlotMapping taskSlotMapping = taskSlotMappings.get(executionAttemptID);
-
- if (taskSlotMapping != null) {
- return taskSlotMapping.getTask();
- } else {
- return null;
- }
- }
-
- private Task removeTask(ExecutionAttemptID executionAttemptID) {
- TaskSlotMapping taskSlotMapping = taskSlotMappings.remove(executionAttemptID);
-
- if (taskSlotMapping != null) {
- final Task task = taskSlotMapping.getTask();
- final TaskSlot taskSlot = taskSlotMapping.getTaskSlot();
-
- taskSlot.remove(task);
-
- return task;
- } else {
- return null;
- }
- }
-
- private Iterable<Task> getAllTasks() {
- final Iterator<TaskSlotMapping> taskEntryIterator = taskSlotMappings.values().iterator();
- final Iterator<Task> iterator = new Iterator<Task>() {
- @Override
- public boolean hasNext() {
- return taskEntryIterator.hasNext();
- }
-
- @Override
- public Task next() {
- return taskEntryIterator.next().getTask();
- }
-
- @Override
- public void remove() {
- taskEntryIterator.remove();
- }
- };
-
- return new Iterable<Task>() {
- @Override
- public Iterator<Task> iterator() {
- return iterator;
- }
- };
- }
-
- private void clearTasks() {
- taskSlotMappings.clear();
-
- for (TaskSlot taskSlot: taskSlots.values()) {
- taskSlot.clear();
- }
- }
-
private void failTask(final ExecutionAttemptID executionAttemptID, final Throwable cause) {
- final Task task = getTask(executionAttemptID);
+ final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
try {
@@ -568,18 +517,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
}
- private void cancelAndClearAllTasks(Throwable cause) {
- log.info("Cancellaing all computations and discarding all cached data.");
-
- Iterable<Task> tasks = getAllTasks();
-
- for (Task task: tasks) {
- task.failExternally(cause);
- }
-
- clearTasks();
- }
-
private void updateTaskExecutionState(
final UUID jobMasterLeaderId,
final JobMasterGateway jobMasterGateway,
@@ -602,11 +539,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
private void unregisterTaskAndNotifyFinalState(
final UUID jobMasterLeaderId,
- final JobMasterGateway jobMasterGateway,
- final ExecutionAttemptID executionAttemptID)
- {
- Task task = removeTask(executionAttemptID);
+ final JobMasterGateway jobMasterGateway,
+ final ExecutionAttemptID executionAttemptID) {
+ Task task = taskSlotTable.removeTask(executionAttemptID);
if (task != null) {
if (!task.getExecutionState().isTerminal()) {
try {
@@ -718,6 +654,41 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
}
+ private void freeSlot(AllocationID allocationId) {
+ Preconditions.checkNotNull(allocationId);
+
+ try {
+ int freedSlotIndex = taskSlotTable.freeSlot(allocationId);
+
+ if (freedSlotIndex != -1 && isConnectedToResourceManager()) {
+ // the slot was freed. Tell the RM about it
+ ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+
+ resourceManagerGateway.notifySlotAvailable(
+ resourceManagerConnection.getTargetLeaderId(),
+ resourceManagerConnection.getRegistrationId(),
+ new SlotID(getResourceID(), freedSlotIndex));
+ }
+ } catch (SlotNotFoundException e) {
+ log.debug("Could not free slot for allocation id {}.", allocationId, e);
+ }
+ }
+
+ private void timeoutSlot(AllocationID allocationId, UUID ticket) {
+ Preconditions.checkNotNull(allocationId);
+ Preconditions.checkNotNull(ticket);
+
+ if (taskSlotTable.isValidTimeout(allocationId, ticket)) {
+ freeSlot(allocationId);
+ } else {
+ log.debug("Received an invalid timeout for allocation id {} with ticket {}.", allocationId, ticket);
+ }
+ }
+
+ private boolean isConnectedToResourceManager() {
+ return (resourceManagerConnection != null && resourceManagerConnection.isConnected());
+ }
+
// ------------------------------------------------------------------------
// Properties
// ------------------------------------------------------------------------
@@ -778,7 +749,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
/**
* The listener for leader changes of the resource manager
*/
- private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
+ private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
@@ -796,7 +767,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
}
- private class TaskManagerActionsImpl implements TaskManagerActions {
+ private final class TaskManagerActionsImpl implements TaskManagerActions {
private final UUID jobMasterLeaderId;
private final JobMasterGateway jobMasterGateway;
@@ -837,4 +808,27 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
}
+ private class SlotActionsImpl implements SlotActions {
+
+ @Override
+ public void freeSlot(final AllocationID allocationId) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ TaskExecutor.this.freeSlot(allocationId);
+ }
+ });
+ }
+
+ @Override
+ public void timeoutSlot(final AllocationID allocationId, final UUID ticket) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ TaskExecutor.this.timeoutSlot(allocationId, ticket);
+ }
+ });
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index bb66655..ca1d2ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -98,6 +98,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
taskManagerServices.getTaskManagerMetricGroup(),
taskManagerServices.getBroadcastVariableManager(),
taskManagerServices.getFileCache(),
+ taskManagerServices.getTaskSlotTable(),
this);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index e264a1c..c1728b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -38,6 +40,8 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -48,6 +52,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
* Container for {@link TaskExecutor} services such as the {@link MemoryManager}, {@link IOManager},
@@ -65,6 +72,7 @@ public class TaskManagerServices {
private final TaskManagerMetricGroup taskManagerMetricGroup;
private final BroadcastVariableManager broadcastVariableManager;
private final FileCache fileCache;
+ private final TaskSlotTable taskSlotTable;
private TaskManagerServices(
TaskManagerLocation taskManagerLocation,
@@ -74,7 +82,8 @@ public class TaskManagerServices {
MetricRegistry metricRegistry,
TaskManagerMetricGroup taskManagerMetricGroup,
BroadcastVariableManager broadcastVariableManager,
- FileCache fileCache) {
+ FileCache fileCache,
+ TaskSlotTable taskSlotTable) {
this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
this.memoryManager = Preconditions.checkNotNull(memoryManager);
@@ -84,6 +93,7 @@ public class TaskManagerServices {
this.taskManagerMetricGroup = Preconditions.checkNotNull(taskManagerMetricGroup);
this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager);
this.fileCache = Preconditions.checkNotNull(fileCache);
+ this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
}
// --------------------------------------------------------------------------------------------
@@ -121,6 +131,10 @@ public class TaskManagerServices {
public FileCache getFileCache() {
return fileCache;
}
+
+ public TaskSlotTable getTaskSlotTable() {
+ return taskSlotTable;
+ }
// --------------------------------------------------------------------------------------------
// Static factory methods for task manager services
@@ -167,6 +181,16 @@ public class TaskManagerServices {
final FileCache fileCache = new FileCache(taskManagerServicesConfiguration.getTmpDirPaths());
+ final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
+
+ for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
+ resourceProfiles.add(new ResourceProfile(1.0, 42L));
+ }
+
+ final TimerService<AllocationID> timerService = new TimerService<>(new ScheduledThreadPoolExecutor(1));
+
+ final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);
+
return new TaskManagerServices(
taskManagerLocation,
memoryManager,
@@ -175,7 +199,8 @@ public class TaskManagerServices {
metricRegistry,
taskManagerMetricGroup,
broadcastVariableManager,
- fileCache);
+ fileCache,
+ taskSlotTable);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 80dfc09..036a890 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -173,6 +173,8 @@ public class TaskManagerServicesConfiguration {
final MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
+
+
return new TaskManagerServicesConfiguration(
remoteAddress,
tmpDirs,
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java
deleted file mode 100644
index 4fc1d66..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskexecutor;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.util.Preconditions;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Container for multiple {@link Task} belonging to the same slot.
- */
-public class TaskSlot {
- private final AllocationID allocationID;
- private final ResourceID resourceID;
- private final Map<ExecutionAttemptID, Task> tasks;
-
- public TaskSlot(AllocationID allocationID, ResourceID resourceID) {
- this.allocationID = Preconditions.checkNotNull(allocationID);
- this.resourceID = Preconditions.checkNotNull(resourceID);
- tasks = new HashMap<>(4);
- }
-
- public AllocationID getAllocationID() {
- return allocationID;
- }
-
- public ResourceID getResourceID() {
- return resourceID;
- }
-
- public boolean add(Task task) {
- // sanity check
- Preconditions.checkArgument(allocationID.equals(task.getAllocationID()));
-
- Task oldTask = tasks.put(task.getExecutionId(), task);
-
- if (oldTask != null) {
- tasks.put(task.getExecutionId(), oldTask);
- return false;
- } else {
- return true;
- }
- }
-
- public Task remove(Task task) {
- return tasks.remove(task.getExecutionId());
- }
-
- public void clear() {
- tasks.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java
deleted file mode 100644
index e67fd52..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskexecutor;
-
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Mapping between a {@link Task} and its {@link TaskSlot}.
- */
-public class TaskSlotMapping {
-
- private final Task task;
- private final TaskSlot taskSlot;
-
- public TaskSlotMapping(Task task, TaskSlot taskSlot) {
- this.task = Preconditions.checkNotNull(task);
- this.taskSlot = Preconditions.checkNotNull(taskSlot);
- }
-
- public Task getTask() {
- return task;
- }
-
- public TaskSlot getTaskSlot() {
- return taskSlot;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotActions.java
new file mode 100644
index 0000000..f7ed235
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotActions.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.util.UUID;
+
+/**
+ * Interface to trigger slot actions from within the {@link TaskSlotTable}.
+ */
+public interface SlotActions {
+
+ /**
+ * Free the task slot with the given allocation id.
+ *
+ * @param allocationId to identify the slot to be freed
+ */
+ void freeSlot(AllocationID allocationId);
+
+ /**
+ * Timeout the task slot for the given allocation id. The timeout is identified by the given
+ * ticket to filter invalid timeouts out.
+ *
+ * @param allocationId identifying the task slot to be timed out
+ * @param ticket allowing to filter invalid timeouts out
+ */
+ void timeoutSlot(AllocationID allocationId, UUID ticket);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotActiveException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotActiveException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotActiveException.java
new file mode 100644
index 0000000..b0ddc5d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotActiveException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+/**
+ * Exception indicating that the given {@link TaskSlot} was not in state active.
+ */
+public class SlotNotActiveException extends Exception {
+
+ private static final long serialVersionUID = 4305837511564584L;
+
+ public SlotNotActiveException(JobID jobId, AllocationID allocationId) {
+ super("No active slot for job " + jobId + " with allocation id " + allocationId + '.');
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotFoundException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotFoundException.java
new file mode 100644
index 0000000..c639b16
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotFoundException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+/**
+ * Exception indicating that a {@link TaskSlot} could not be found.
+ */
+public class SlotNotFoundException extends Exception {
+
+ private static final long serialVersionUID = -883614807750137925L;
+
+ public SlotNotFoundException(AllocationID allocationId) {
+ this("Could not find slot for " + allocationId + '.');
+ }
+
+ public SlotNotFoundException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
new file mode 100644
index 0000000..0942772
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Container for multiple {@link Task} belonging to the same slot. A {@link TaskSlot} can be in one
+ * of the following states:
+ * <ul>
+ * <li>Free - The slot is empty and not allocated to a job</li>
+ * <li>Releasing - The slot is about to be freed after it has become empty.</li>
+ * <li>Allocated - The slot has been allocated for a job.</li>
+ * <li>Active - The slot is in active use by a job manager which is the leader of the allocating job.</li>
+ * </ul>
+ * <p>
+ * A task slot can only be allocated if it is in state free. An allocated task slot can transition
+ * to state active.
+ *<p>
+ * An active slot allows to add tasks from the respective job and with the correct allocation id.
+ * An active slot can be marked as inactive which sets the state back to allocated.
+ * <p>
+ * An allocated or active slot can only be freed if it is empty. If it is not empty, then it's state
+ * can be set to releasing indicating that it can be freed once it becomes empty.
+ */
+public class TaskSlot {
+
+ /** Index of the task slot */
+ private final int index;
+
+ /** Resource characteristics for this slot */
+ private final ResourceProfile resourceProfile;
+
+ /** Tasks running in this slot */
+ private final Map<ExecutionAttemptID, Task> tasks;
+
+ /** State of this slot */
+ private TaskSlotState state;
+
+ /** Job id to which the slot has been allocated; null if not allocated */
+ private JobID jobId;
+
+ /** Allocation id of this slot; null if not allocated */
+ private AllocationID allocationId;
+
+ TaskSlot(final int index, final ResourceProfile resourceProfile) {
+ Preconditions.checkArgument(0 <= index, "The index must be greater than 0.");
+ this.index = index;
+ this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
+
+ this.tasks = new HashMap<>(4);
+ this.state = TaskSlotState.FREE;
+
+ this.jobId = null;
+ this.allocationId = null;
+ }
+
+ // ----------------------------------------------------------------------------------
+ // State accessors
+ // ----------------------------------------------------------------------------------
+
+ public int getIndex() {
+ return index;
+ }
+
+ public ResourceProfile getResourceProfile() {
+ return resourceProfile;
+ }
+
+ public JobID getJobId() {
+ return jobId;
+ }
+
+ public AllocationID getAllocationId() {
+ return allocationId;
+ }
+
+ TaskSlotState getState() {
+ return state;
+ }
+
+ public boolean isEmpty() {
+ return tasks.isEmpty();
+ }
+
+ public boolean isFree() {
+ return TaskSlotState.FREE == state;
+ }
+
+ public boolean isActive(JobID activeJobId, AllocationID activeAllocationId) {
+ Preconditions.checkNotNull(activeJobId);
+ Preconditions.checkNotNull(activeAllocationId);
+
+ return TaskSlotState.ACTIVE == state &&
+ activeJobId.equals(jobId) &&
+ activeAllocationId.equals(allocationId);
+ }
+
+ public boolean isAllocated(JobID jobIdToCheck, AllocationID allocationIDToCheck) {
+ Preconditions.checkNotNull(jobIdToCheck);
+ Preconditions.checkNotNull(allocationIDToCheck);
+
+ return jobIdToCheck.equals(jobId) && allocationIDToCheck.equals(allocationId) &&
+ (TaskSlotState.ACTIVE == state || TaskSlotState.ALLOCATED == state);
+ }
+
+ public boolean isReleasing() {
+ return TaskSlotState.RELEASING == state;
+ }
+
+ /**
+ * Get all tasks running in this task slot.
+ *
+ * @return Iterator to all currently contained tasks in this task slot.
+ */
+ public Iterator<Task> getTasks() {
+ return tasks.values().iterator();
+ }
+
+ // ----------------------------------------------------------------------------------
+ // State changing methods
+ // ----------------------------------------------------------------------------------
+
+ /**
+ * Add the given task to the task slot. This is only possible if there is not already another
+ * task with the same execution attempt id added to the task slot. In this case, the method
+ * returns true. Otherwise the task slot is left unchanged and false is returned.
+ *
+ * In case that the task slot state is not active an {@link IllegalStateException} is thrown.
+ * In case that the task's job id and allocation id don't match with the job id and allocation
+ * id for which the task slot has been allocated, an {@link IllegalArgumentException} is thrown.
+ *
+ * @param task to be added to the task slot
+ * @throws IllegalStateException if the task slot is not in state active
+ * @return true if the task was added to the task slot; otherwise false
+ */
+ public boolean add(Task task) {
+ // Check that this slot has been assigned to the job sending this task
+ Preconditions.checkArgument(task.getJobID().equals(jobId), "The task's job id does not match the " +
+ "job id for which the slot has been allocated.");
+ Preconditions.checkArgument(task.getAllocationId().equals(allocationId), "The task's allocation " +
+ "id does not match the allocation id for which the slot has been allocated.");
+ Preconditions.checkState(TaskSlotState.ACTIVE == state, "The task slot is not in state active.");
+
+ Task oldTask = tasks.put(task.getExecutionId(), task);
+
+ if (oldTask != null) {
+ tasks.put(task.getExecutionId(), oldTask);
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ /**
+ * Remove the task identified by the given execution attempt id.
+ *
+ * @param executionAttemptId identifying the task to be removed
+ * @return The removed task if there was any; otherwise null.
+ */
+ public Task remove(ExecutionAttemptID executionAttemptId) {
+ return tasks.remove(executionAttemptId);
+ }
+
+ /**
+ * Removes all tasks from this task slot.
+ */
+ public void clear() {
+ tasks.clear();
+ }
+
+ /**
+ * Allocate the task slot for the given job and allocation id. If the slot could be allocated,
+ * or is already allocated/active for the given job and allocation id, then the method returns
+ * true. Otherwise it returns false.
+ *
+ * A slot can only be allocated if it's current state is free.
+ *
+ * @param newJobId to allocate the slot for
+ * @param newAllocationId to identify the slot allocation
+ * @return True if the slot was allocated for the given job and allocation id; otherwise false
+ */
+ public boolean allocate(JobID newJobId, AllocationID newAllocationId) {
+ if (TaskSlotState.FREE == state) {
+ // sanity checks
+ Preconditions.checkState(allocationId == null);
+ Preconditions.checkState(jobId == null);
+
+ this.jobId = Preconditions.checkNotNull(newJobId);
+ this.allocationId = Preconditions.checkNotNull(newAllocationId);
+
+ state = TaskSlotState.ALLOCATED;
+
+ return true;
+ } else if (TaskSlotState.ALLOCATED == state || TaskSlotState.ACTIVE == state) {
+ Preconditions.checkNotNull(newJobId);
+ Preconditions.checkNotNull(newAllocationId);
+
+ return newJobId.equals(jobId) && newAllocationId.equals(allocationId);
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Mark this slot as active. A slot can only be marked active if it's in state allocated.
+ *
+ * The method returns true if the slot was set to active. Otherwise it returns false.
+ *
+ * @return True if the new state of the slot is active; otherwise false
+ */
+ public boolean markActive() {
+ if (TaskSlotState.ALLOCATED == state || TaskSlotState.ACTIVE == state) {
+ state = TaskSlotState.ACTIVE;
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Mark the slot as inactive/allocated. A slot can only be marked as inactive/allocated if it's
+ * in state allocated or active.
+ *
+ * @return True if the new state of the slot is allocated; otherwise false
+ */
+ public boolean markInactive() {
+ if (TaskSlotState.ACTIVE == state || TaskSlotState.ALLOCATED == state) {
+ state = TaskSlotState.ALLOCATED;
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Mark the slot as free. A slot can only marked as free if it's empty.
+ *
+ * @return True if the new state is free; otherwise false
+ */
+ public boolean markFree() {
+ if (isEmpty()) {
+ state = TaskSlotState.FREE;
+ this.jobId = null;
+ this.allocationId = null;
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Mark this slot as releasing. A slot can always be marked as releasing.
+ *
+ * @return True
+ */
+ public boolean markReleasing() {
+ state = TaskSlotState.RELEASING;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotState.java
new file mode 100644
index 0000000..e3ba903
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotState.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+/**
+ * Internal task slot state
+ */
+enum TaskSlotState {
+ ACTIVE, // Slot is in active use by a job manager responsible for a job
+ ALLOCATED, // Slot has been allocated for a job but not yet given to a job manager
+ RELEASING, // Slot is not empty but tasks are failed. Upon removal of all tasks, it will be released
+ FREE // Slot is free
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
new file mode 100644
index 0000000..42cb919
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Container for multiple {@link TaskSlot} instances. Additionally, it maintains multiple indices
+ * for faster access to tasks and sets of allocated slots.
+ * <p>
+ * The task slot table automatically registers timeouts for allocated slots which cannot be assigned
+ * to a job manager.
+ * <p>
+ * Before the task slot table can be used, it must be started via the {@link #start} method.
+ */
+public class TaskSlotTable implements TimeoutListener<AllocationID> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskSlotTable.class);
+
+ /** Timer service used to time out allocated slots */
+ private final TimerService<AllocationID> timerService;
+
+ /** The list of all task slots */
+ private final List<TaskSlot> taskSlots;
+
+ /** Mapping from allocation id to task slot */
+ private final Map<AllocationID, TaskSlot> allocationIDTaskSlotMap;
+
+ /** Mapping from execution attempt id to task and task slot */
+ private final Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings;
+
+ /** Mapping from job id to allocated slots for a job */
+ private final Map<JobID, Set<AllocationID>> slotsPerJob;
+
+ /** Interface for slot actions, such as freeing them or timing them out */
+ private SlotActions slotActions;
+
+ /** The timeout for allocated slots */
+ private Time slotTimeout;
+
+ /** Whether the table has been started */
+ private boolean started;
+
+ public TaskSlotTable(
+ final Collection<ResourceProfile> resourceProfiles,
+ final TimerService<AllocationID> timerService) {
+
+ int numberSlots = resourceProfiles.size();
+
+ Preconditions.checkArgument(0 < numberSlots, "The number of task slots must be greater than 0.");
+
+ this.timerService = Preconditions.checkNotNull(timerService);
+
+ taskSlots = Arrays.asList(new TaskSlot[numberSlots]);
+
+ int index = 0;
+
+ // create the task slots for the given resource profiles
+ for (ResourceProfile resourceProfile: resourceProfiles) {
+ taskSlots.set(index, new TaskSlot(index, resourceProfile));
+ ++index;
+ }
+
+ allocationIDTaskSlotMap = new HashMap<>(numberSlots);
+
+ taskSlotMappings = new HashMap<>(4 * numberSlots);
+
+ slotsPerJob = new HashMap<>(4);
+
+ slotActions = null;
+ slotTimeout = null;
+ started = false;
+ }
+
+ /**
+ * Start the task slot table with the given slot actions and slot timeout value.
+ *
+ * @param initialSlotActions to use for slot actions
+ * @param initialSlotTimeout to use for slot timeouts
+ */
+ public void start(SlotActions initialSlotActions, Time initialSlotTimeout) {
+ this.slotActions = Preconditions.checkNotNull(initialSlotActions);
+ this.slotTimeout = Preconditions.checkNotNull(initialSlotTimeout);
+
+ timerService.start(this);
+
+ started = true;
+ }
+
+ /**
+ * Stop the task slot table.
+ */
+ public void stop() {
+ started = false;
+ timerService.stop();
+ slotTimeout = null;
+ slotActions = null;
+ }
+
+ // ---------------------------------------------------------------------
+ // Slot methods
+ // ---------------------------------------------------------------------
+
+ /**
+ * Allocate the slot with the given index for the given job and allocation id. Returns true if
+ * the slot could be allocated. Otherwise it returns false;
+ *
+ * @param index of the task slot to allocate
+ * @param jobId to allocate the task slot for
+ * @param allocationId identifying the allocation
+ * @return True if the task slot could be allocated; otherwise false
+ */
+ public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId) {
+ checkInit();
+
+ TaskSlot taskSlot = taskSlots.get(index);
+
+ boolean result = taskSlot.allocate(jobId, allocationId);
+
+ if (result) {
+ // update the alloction id to task slot map
+ allocationIDTaskSlotMap.put(allocationId, taskSlot);
+
+ // register a timeout for this slot since it's in state allocated
+ timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
+
+ // add this slot to the set of job slots
+ Set<AllocationID> slots = slotsPerJob.get(jobId);
+
+ if (slots == null) {
+ slots = new HashSet<>(4);
+ slotsPerJob.put(jobId, slots);
+ }
+
+ slots.add(allocationId);
+ }
+
+ return result;
+ }
+
+ /**
+ * Marks the slot under the given allocation id as active. If the slot could not be found, then
+ * a {@link SlotNotFoundException} is thrown.
+ *
+ * @param allocationId to identify the task slot to mark as active
+ * @throws SlotNotFoundException if the slot could not be found for the given allocation id
+ * @return True if the slot could be marked active
+ */
+ public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException {
+ checkInit();
+
+ TaskSlot taskSlot = getTaskSlot(allocationId);
+
+ if (taskSlot != null) {
+ if (taskSlot.markActive()) {
+ // unregister a potential timeout
+ timerService.unregisterTimeout(allocationId);
+
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ throw new SlotNotFoundException(allocationId);
+ }
+ }
+
+ /**
+ * Marks the slot under the given allocation id as inactive. If the slot could not be found,
+ * then a {@link SlotNotFoundException} is thrown.
+ *
+ * @param allocationId to identify the task slot to mark as inactive
+ * @throws SlotNotFoundException if the slot could not be found for the given allocation id
+ * @return True if the slot could be marked inactive
+ */
+ public boolean markSlotInactive(AllocationID allocationId) throws SlotNotFoundException {
+ checkInit();
+
+ TaskSlot taskSlot = getTaskSlot(allocationId);
+
+ if (taskSlot != null) {
+ if (taskSlot.markInactive()) {
+ // register a timeout to free the slot
+ timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
+
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ throw new SlotNotFoundException(allocationId);
+ }
+ }
+
+ /**
+ * Try to free the slot. If the slot is empty it will set the state of the task slot to free
+ * and return its index. If the slot is not empty, then it will set the state of the task slot
+ * to releasing, fail all tasks and return -1.
+ *
+ * @param allocationId identifying the task slot to be freed
+ * @throws SlotNotFoundException if there is not task slot for the given allocation id
+ * @return Index of the freed slot if the slot could be freed; otherwise -1
+ */
+ public int freeSlot(AllocationID allocationId) throws SlotNotFoundException {
+ return freeSlot(allocationId, new Exception("The task slot of this task is being freed."));
+ }
+
+ /**
+ * Tries to free the slot. If the slot is empty it will set the state of the task slot to free
+ * and return its index. If the slot is not empty, then it will set the state of the task slot
+ * to releasing, fail all tasks and return -1.
+ *
+ * @param allocationId identifying the task slot to be freed
+ * @param cause to fail the tasks with if slot is not empty
+ * @throws SlotNotFoundException if there is not task slot for the given allocation id
+ * @return Index of the freed slot if the slot could be freed; otherwise -1
+ */
+ public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
+ checkInit();
+
+ TaskSlot taskSlot = getTaskSlot(allocationId);
+
+ if (taskSlot != null) {
+ LOG.info("Free slot {}.", allocationId, cause);
+
+ final JobID jobId = taskSlot.getJobId();
+
+ if (taskSlot.markFree()) {
+ // remove the allocation id to task slot mapping
+ allocationIDTaskSlotMap.remove(allocationId);
+
+ // unregister a potential timeout
+ timerService.unregisterTimeout(allocationId);
+
+ Set<AllocationID> slots = slotsPerJob.get(jobId);
+
+ if (slots == null) {
+ throw new IllegalStateException("There are no more slots allocated for the job " + jobId +
+ ". This indicates a programming bug.");
+ }
+
+ slots.remove(allocationId);
+
+ if (slots.isEmpty()) {
+ slotsPerJob.remove(jobId);
+ }
+
+ return taskSlot.getIndex();
+ } else {
+ // we couldn't free the task slot because it still contains task, fail the tasks
+ // and set the slot state to releasing so that it gets eventually freed
+ taskSlot.markReleasing();
+
+ Iterator<Task> taskIterator = taskSlot.getTasks();
+
+ while (taskIterator.hasNext()) {
+ taskIterator.next().failExternally(cause);
+ }
+
+ return -1;
+ }
+ } else {
+ throw new SlotNotFoundException(allocationId);
+ }
+ }
+
+ /**
+ * Check whether the timeout with ticket is valid for the given allocation id.
+ *
+ * @param allocationId to check against
+ * @param ticket of the timeout
+ * @return True if the timeout is valid; otherwise false
+ */
+ public boolean isValidTimeout(AllocationID allocationId, UUID ticket) {
+ checkInit();
+
+ return timerService.isValid(allocationId, ticket);
+ }
+
+ /**
+ * Check whether the slot for the given index is allocated for the given job and allocation id.
+ *
+ * @param index of the task slot
+ * @param jobId for which the task slot should be allocated
+ * @param allocationId which should match the task slot's allocation id
+ * @return True if the given task slot is allocated for the given job and allocation id
+ */
+ public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
+ checkInit();
+
+ TaskSlot taskSlot = taskSlots.get(index);
+
+ return taskSlot.isAllocated(jobId, allocationId);
+ }
+
+ /**
+ * Check whether there exists an active slot for the given job and allocation id.
+ *
+ * @param jobId of the allocated slot
+ * @param allocationId identifying the allocation
+ * @return True if there exists a task slot which is active for the given job and allocation id.
+ */
+ public boolean existActiveSlot(JobID jobId, AllocationID allocationId) {
+ TaskSlot taskSlot = getTaskSlot(allocationId);
+
+ if (taskSlot != null) {
+ return taskSlot.isActive(jobId, allocationId);
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Check whether the task slot with the given index is free.
+ *
+ * @param index of the task slot
+ * @return True if the task slot is free; otherwise false
+ */
+ public boolean isSlotFree(int index) {
+ TaskSlot taskSlot = taskSlots.get(index);
+
+ return taskSlot.isFree();
+ }
+
+ /**
+ * Check whether the job has allocated (not active) slots.
+ *
+ * @param jobId for which to check for allocated slots
+ * @return True if there are allocated slots for the given job id.
+ */
+ public boolean hasAllocatedSlots(JobID jobId) {
+ return getAllocatedSlots(jobId).hasNext();
+ }
+
+ /**
+ * Return an iterator of allocated slots (their allocation ids) for the given job id.
+ *
+ * @param jobId for which to return the allocated slots
+ * @return Iterator of allocation ids of allocated slots.
+ */
+ public Iterator<AllocationID> getAllocatedSlots(JobID jobId) {
+ return new AllocationIDIterator(jobId, TaskSlotState.ALLOCATED);
+ }
+
+ /**
+ * Return an iterator of active slots (their application ids) for the given job id.
+ *
+ * @param jobId for which to return the active slots
+ * @return Iterator of allocation ids of active slots
+ */
+ public Iterator<AllocationID> getActiveSlots(JobID jobId) {
+ return new AllocationIDIterator(jobId, TaskSlotState.ACTIVE);
+ }
+
+ // ---------------------------------------------------------------------
+ // Task methods
+ // ---------------------------------------------------------------------
+
+ /**
+ * Add the given task to the slot identified by the task's allocation id.
+ *
+ * @param task to add to the task slot with the respective allocation id
+ * @throws SlotNotFoundException if there was no slot for the given allocation id
+ * @throws SlotNotActiveException if there was no slot active for task's job and allocation id
+ * @return True if the task could be added to the task slot; otherwise false
+ */
+ public boolean addTask(Task task) throws SlotNotFoundException, SlotNotActiveException {
+ Preconditions.checkNotNull(task);
+
+ TaskSlot taskSlot = getTaskSlot(task.getAllocationId());
+
+ if (taskSlot != null) {
+ if (taskSlot.isActive(task.getJobID(), task.getAllocationId())) {
+ if (taskSlot.add(task)) {
+ taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping(task, taskSlot));
+
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ throw new SlotNotActiveException(task.getJobID(), task.getAllocationId());
+ }
+ } else {
+ throw new SlotNotFoundException(taskSlot.getAllocationId());
+ }
+ }
+
+ /**
+ * Remove the task with the given execution attempt id from its task slot. If the owning task
+ * slot is in state releasing and empty after removing the task, the slot is freed via the
+ * slot actions.
+ *
+ * @param executionAttemptID identifying the task to remove
+ * @return The removed task if there is any for the given execution attempt id; otherwise null
+ */
+ public Task removeTask(ExecutionAttemptID executionAttemptID) {
+ TaskSlotMapping taskSlotMapping = taskSlotMappings.remove(executionAttemptID);
+
+ if (taskSlotMapping != null) {
+ Task task = taskSlotMapping.getTask();
+ TaskSlot taskSlot = taskSlotMapping.getTaskSlot();
+
+ taskSlot.remove(task.getExecutionId());
+
+ if (taskSlot.isReleasing() && taskSlot.isEmpty()) {
+ slotActions.freeSlot(taskSlot.getAllocationId());
+ }
+
+ return task;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Get the task for the given execution attempt id. If none could be found, then return null.
+ *
+ * @param executionAttemptID identifying the requested task
+ * @return The task for the given execution attempt id if it exist; otherwise null
+ */
+ public Task getTask(ExecutionAttemptID executionAttemptID) {
+ TaskSlotMapping taskSlotMapping = taskSlotMappings.get(executionAttemptID);
+
+ if (taskSlotMapping != null) {
+ return taskSlotMapping.getTask();
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Return an iterator over all tasks for a given job.
+ *
+ * @param jobId identifying the job of the requested tasks
+ * @return Iterator over all task for a given job
+ */
+ public Iterator<Task> getTasks(JobID jobId) {
+ return new TaskIterator(jobId);
+ }
+
+ // ---------------------------------------------------------------------
+ // TimeoutListener methods
+ // ---------------------------------------------------------------------
+
+ @Override
+ public void notifyTimeout(AllocationID key, UUID ticket) {
+ if (slotActions != null) {
+ slotActions.timeoutSlot(key, ticket);
+ }
+ }
+
+ // ---------------------------------------------------------------------
+ // Internal methods
+ // ---------------------------------------------------------------------
+
+ private TaskSlot getTaskSlot(AllocationID allocationId) {
+ Preconditions.checkNotNull(allocationId);
+
+ TaskSlot taskSlot = allocationIDTaskSlotMap.get(allocationId);
+
+ return taskSlot;
+ }
+
+ private void checkInit() {
+ Preconditions.checkState(started, "The " + TaskSlotTable.class.getSimpleName() + " has to be started.");
+ }
+
+ // ---------------------------------------------------------------------
+ // Static utility classes
+ // ---------------------------------------------------------------------
+
+ /**
+ * Mapping class between a {@link Task} and a {@link TaskSlot}.
+ */
+ private static final class TaskSlotMapping {
+ private final Task task;
+ private final TaskSlot taskSlot;
+
+
+ private TaskSlotMapping(Task task, TaskSlot taskSlot) {
+ this.task = Preconditions.checkNotNull(task);
+ this.taskSlot = Preconditions.checkNotNull(taskSlot);
+ }
+
+ public Task getTask() {
+ return task;
+ }
+
+ public TaskSlot getTaskSlot() {
+ return taskSlot;
+ }
+ }
+
+ /**
+ * Iterator over {@link AllocationID} of the {@link TaskSlot} of a given job. Additionally,
+ * the task slots identified by the allocation ids are in the given state.
+ */
+ private final class AllocationIDIterator implements Iterator<AllocationID> {
+ private final Iterator<TaskSlot> iterator;
+
+ private AllocationIDIterator(JobID jobId, TaskSlotState state) {
+ iterator = new TaskSlotIterator(jobId, state);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public AllocationID next() {
+ try {
+ return iterator.next().getAllocationId();
+ } catch (NoSuchElementException e) {
+ throw new NoSuchElementException("No more allocation ids.");
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Cannot remove allocation ids via this iterator.");
+ }
+ }
+
+ /**
+ * Iterator over {@link TaskSlot} which fulfill a given state condition and belong to the given
+ * job.
+ */
+ private final class TaskSlotIterator implements Iterator<TaskSlot> {
+ private final Iterator<AllocationID> allSlots;
+ private final TaskSlotState state;
+
+ private TaskSlot currentSlot;
+
+ private TaskSlotIterator(JobID jobId, TaskSlotState state) {
+
+ Set<AllocationID> allocationIds = slotsPerJob.get(jobId);
+
+ if (allocationIds == null || allocationIds.isEmpty()) {
+ allSlots = Collections.emptyIterator();
+ } else {
+ allSlots = allocationIds.iterator();
+ }
+
+ this.state = Preconditions.checkNotNull(state);
+
+ this.currentSlot = null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ while (currentSlot == null && allSlots.hasNext()) {
+ AllocationID tempSlot = allSlots.next();
+
+ TaskSlot taskSlot = getTaskSlot(tempSlot);
+
+ if (taskSlot != null && taskSlot.getState() == state) {
+ currentSlot = taskSlot;
+ }
+ }
+
+ return currentSlot != null;
+ }
+
+ @Override
+ public TaskSlot next() {
+ if (currentSlot != null) {
+ TaskSlot result = currentSlot;
+
+ currentSlot = null;
+
+ return result;
+ } else {
+ while (true) {
+ AllocationID tempSlot;
+
+ try {
+ tempSlot = allSlots.next();
+ } catch (NoSuchElementException e) {
+ throw new NoSuchElementException("No more task slots.");
+ }
+
+ TaskSlot taskSlot = getTaskSlot(tempSlot);
+
+ if (taskSlot != null && taskSlot.getState() == state) {
+ return taskSlot;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Cannot remove task slots via this iterator.");
+ }
+ }
+
+ /**
+ * Iterator over all {@link Task} for a given job
+ */
+ private final class TaskIterator implements Iterator<Task> {
+ private final Iterator<TaskSlot> taskSlotIterator;
+
+ private Iterator<Task> currentTasks;
+
+ private TaskIterator(JobID jobId) {
+ this.taskSlotIterator = new TaskSlotIterator(jobId, TaskSlotState.ACTIVE);
+
+ this.currentTasks = null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ while ((currentTasks == null || !currentTasks.hasNext()) && taskSlotIterator.hasNext()) {
+ TaskSlot taskSlot = taskSlotIterator.next();
+
+ currentTasks = taskSlot.getTasks();
+ }
+
+ return (currentTasks != null && currentTasks.hasNext());
+ }
+
+ @Override
+ public Task next() {
+ while ((currentTasks == null || !currentTasks.hasNext())) {
+ TaskSlot taskSlot;
+
+ try {
+ taskSlot = taskSlotIterator.next();
+ } catch (NoSuchElementException e) {
+ throw new NoSuchElementException("No more tasks.");
+ }
+
+ currentTasks = taskSlot.getTasks();
+ }
+
+ return currentTasks.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Cannot remove tasks via this iterator.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimeoutListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimeoutListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimeoutListener.java
new file mode 100644
index 0000000..3e75d74
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimeoutListener.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import java.util.UUID;
+
+/**
+ * Listener for timeout events by the {@link TimerService}.
+ * @param <K> Type of the timeout key
+ */
+public interface TimeoutListener<K> {
+
+ /**
+ * Notify the listener about the timeout for an event identified by key. Additionally the method
+ * is called with the timeout ticket which allows to identify outdated timeout events.
+ *
+ * @param key identifying the timed out event
+ * @param ticket used to check whether the timeout is still valid
+ */
+ void notifyTimeout(K key, UUID ticket);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
new file mode 100644
index 0000000..e28e801
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Service to register timeouts for a given key. The timeouts are identified by a ticket so that
+ * newly registered timeouts for the same key can be distinguished from older timeouts.
+ *
+ * @param <K> Type of the key
+ */
+public class TimerService<K> {
+
+ /** Executor service for the scheduled timeouts */
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ /** Map of currently active timeouts */
+ private final Map<K, Timeout<K>> timeouts;
+
+ /** Listener which is notified about occurring timeouts */
+ private TimeoutListener<K> timeoutListener;
+
+ public TimerService(final ScheduledExecutorService scheduledExecutorService) {
+ this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
+
+ this.timeouts = new HashMap<>(16);
+ this.timeoutListener = null;
+ }
+
+ public void start(TimeoutListener<K> initialTimeoutListener) {
+ // sanity check; We only allow to assign a timeout listener once
+ Preconditions.checkState(!scheduledExecutorService.isShutdown());
+ Preconditions.checkState(timeoutListener == null);
+
+ this.timeoutListener = Preconditions.checkNotNull(initialTimeoutListener);
+ }
+
+ public void stop() {
+ for (K key: timeouts.keySet()) {
+ unregisterTimeout(key);
+ }
+
+ timeoutListener = null;
+
+ scheduledExecutorService.shutdown();
+ }
+
+ /**
+ * Register a timeout for the given key which shall occur in the given delay.
+ *
+ * @param key for which to register the timeout
+ * @param delay until the timeout
+ * @param unit of the timeout delay
+ */
+ public void registerTimeout(final K key, final long delay, final TimeUnit unit) {
+ Preconditions.checkState(timeoutListener != null, "The " + getClass().getSimpleName() +
+ " has not been started.");
+
+ if (timeouts.containsKey(key)) {
+ unregisterTimeout(key);
+ }
+
+ timeouts.put(key, new Timeout<>(timeoutListener, key, delay, unit, scheduledExecutorService));
+ }
+
+ /**
+ * Unregister the timeout for the given key.
+ *
+ * @param key for which to unregister the timeout
+ */
+ public void unregisterTimeout(K key) {
+ Timeout<K> timeout = timeouts.remove(key);
+
+ if (timeout != null) {
+ timeout.cancel();
+ }
+ }
+
+ /**
+ * Check whether the timeout for the given key and ticket is still valid (not yet unregistered
+ * and not yet overwritten).
+ *
+ * @param key for which to check the timeout
+ * @param ticket of the timeout
+ * @return True if the timeout ticket is still valid; otherwise false
+ */
+ public boolean isValid(K key, UUID ticket) {
+ if (timeouts.containsKey(key)) {
+ Timeout<K> timeout = timeouts.get(key);
+
+ return timeout.getTicket().equals(ticket);
+ } else {
+ return false;
+ }
+ }
+
+ // ---------------------------------------------------------------------
+ // Static utility classes
+ // ---------------------------------------------------------------------
+
+ private static final class Timeout<K> implements Runnable {
+
+ private final TimeoutListener<K> timeoutListener;
+ private final K key;
+ private final ScheduledFuture<?> scheduledTimeout;
+ private final UUID ticket;
+
+ Timeout(
+ final TimeoutListener<K> timeoutListener,
+ final K key,
+ final long delay,
+ final TimeUnit unit,
+ final ScheduledExecutorService scheduledExecutorService) {
+
+ Preconditions.checkNotNull(scheduledExecutorService);
+
+ this.timeoutListener = Preconditions.checkNotNull(timeoutListener);
+ this.key = Preconditions.checkNotNull(key);
+ this.scheduledTimeout = scheduledExecutorService.schedule(this, delay, unit);
+ this.ticket = UUID.randomUUID();
+ }
+
+ UUID getTicket() {
+ return ticket;
+ }
+
+ void cancel() {
+ scheduledTimeout.cancel(true);
+ }
+
+ @Override
+ public void run() {
+ timeoutListener.notifyTimeout(key, ticket);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 37ac0a3..f16255e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -128,7 +128,7 @@ public class Task implements Runnable, TaskActions {
private final ExecutionAttemptID executionId;
/** ID which identifies the slot in which the task is supposed to run */
- private final AllocationID allocationID;
+ private final AllocationID allocationId;
/** TaskInfo object for this task */
private final TaskInfo taskInfo;
@@ -265,7 +265,7 @@ public class Task implements Runnable, TaskActions {
this.jobId = checkNotNull(tdd.getJobID());
this.vertexId = checkNotNull(tdd.getVertexID());
this.executionId = checkNotNull(tdd.getExecutionId());
- this.allocationID = checkNotNull(tdd.getAllocationID());
+ this.allocationId = checkNotNull(tdd.getAllocationID());
this.taskNameWithSubtask = taskInfo.getTaskNameWithSubtasks();
this.jobConfiguration = checkNotNull(tdd.getJobConfiguration());
this.taskConfiguration = checkNotNull(tdd.getTaskConfiguration());
@@ -370,8 +370,8 @@ public class Task implements Runnable, TaskActions {
return executionId;
}
- public AllocationID getAllocationID() {
- return allocationID;
+ public AllocationID getAllocationId() {
+ return allocationId;
}
public TaskInfo getTaskInfo() {
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index af8aa69..97f42b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.io.network.partition;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.api.reader.BufferReader;
http://git-wip-us.apache.org/repos/asf/flink/blob/5f3adc9f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index ecbd9b5..baae251 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequ
import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;
@@ -83,6 +84,7 @@ public class TaskExecutorTest extends TestLogger {
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
+ mock(TaskSlotTable.class),
mock(FatalErrorHandler.class));
taskManager.start();
@@ -139,6 +141,7 @@ public class TaskExecutorTest extends TestLogger {
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
+ mock(TaskSlotTable.class),
mock(FatalErrorHandler.class));
taskManager.start();
@@ -211,6 +214,7 @@ public class TaskExecutorTest extends TestLogger {
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
+ mock(TaskSlotTable.class),
mock(FatalErrorHandler.class));
taskManager.start();