You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/30 09:49:59 UTC
[6/7] flink git commit: [FLINK-8505] [flip6] Prevent SlotManager from
reaching an inconsistent state
[FLINK-8505] [flip6] Prevent SlotManager from reaching an inconsistent state
The SlotManager could reach an inconsistent state when a formerly free slot is reported
to be allocated by an incoming SlotReport. This state transition did not remove the slot
from the set of free slots. As a consequence, a now allocated slot will be considered for
future SlotRequests. This caused a failure with an IllegalStateException.
The problem is solved by removing an updated slot which is now allocated from the set of
free slots.
This closes #5354.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/492d1b45
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/492d1b45
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/492d1b45
Branch: refs/heads/master
Commit: 492d1b45c283c6611added2b1c750bb2b014d23c
Parents: 03d042a
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jan 24 18:32:47 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 29 17:05:57 2018 +0100
----------------------------------------------------------------------
.../clusterframework/types/TaskManagerSlot.java | 13 ++--
.../slotmanager/SlotManager.java | 77 ++++++++++----------
.../slotmanager/SlotManagerTest.java | 64 ++++++++++++++++
3 files changed, 109 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/492d1b45/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
index f92f9ac..fb7fce3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
@@ -33,19 +33,19 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class TaskManagerSlot {
- /** The unique identification of this slot */
+ /** The unique identification of this slot. */
private final SlotID slotId;
- /** The resource profile of this slot */
+ /** The resource profile of this slot. */
private final ResourceProfile resourceProfile;
- /** Gateway to the TaskExecutor which owns the slot */
+ /** Gateway to the TaskExecutor which owns the slot. */
private final TaskExecutorConnection taskManagerConnection;
- /** Allocation id for which this slot has been allocated */
+ /** Allocation id for which this slot has been allocated. */
private AllocationID allocationId;
- /** Assigned slot request if there is currently an ongoing request */
+ /** Assigned slot request if there is currently an ongoing request. */
private PendingSlotRequest assignedSlotRequest;
private State state;
@@ -139,6 +139,9 @@ public class TaskManagerSlot {
return resourceProfile.isMatching(required);
}
+ /**
+ * State of the {@link TaskManagerSlot}.
+ */
public enum State {
FREE,
PENDING,
http://git-wip-us.apache.org/repos/asf/flink/blob/492d1b45/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 7326719..1e6f810 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -63,54 +63,54 @@ import java.util.concurrent.TimeoutException;
* are not enough slots available the slot manager will notify the resource manager about it via
* {@link ResourceActions#allocateResource(ResourceProfile)}.
*
- * In order to free resources and avoid resource leaks, idling task managers (task managers whose
+ * <p>In order to free resources and avoid resource leaks, idling task managers (task managers whose
* slots are currently not used) and pending slot requests time out triggering their release and
* failure, respectively.
*/
public class SlotManager implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);
- /** Scheduled executor for timeouts */
+ /** Scheduled executor for timeouts. */
private final ScheduledExecutor scheduledExecutor;
- /** Timeout for slot requests to the task manager */
+ /** Timeout for slot requests to the task manager. */
private final Time taskManagerRequestTimeout;
- /** Timeout after which an allocation is discarded */
+ /** Timeout after which an allocation is discarded. */
private final Time slotRequestTimeout;
- /** Timeout after which an unused TaskManager is released */
+ /** Timeout after which an unused TaskManager is released. */
private final Time taskManagerTimeout;
- /** Map for all registered slots */
+ /** Map for all registered slots. */
private final HashMap<SlotID, TaskManagerSlot> slots;
- /** Index of all currently free slots */
+ /** Index of all currently free slots. */
private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;
- /** All currently registered task managers */
+ /** All currently registered task managers. */
private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;
- /** Map of fulfilled and active allocations for request deduplication purposes */
+ /** Map of fulfilled and active allocations for request deduplication purposes. */
private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;
- /** Map of pending/unfulfilled slot allocation requests */
+ /** Map of pending/unfulfilled slot allocation requests. */
private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;
- /** ResourceManager's id */
+ /** ResourceManager's id. */
private ResourceManagerId resourceManagerId;
- /** Executor for future callbacks which have to be "synchronized" */
+ /** Executor for future callbacks which have to be "synchronized". */
private Executor mainThreadExecutor;
- /** Callbacks for resource (de-)allocations */
+ /** Callbacks for resource (de-)allocations. */
private ResourceActions resourceActions;
private ScheduledFuture<?> taskManagerTimeoutCheck;
private ScheduledFuture<?> slotRequestTimeoutCheck;
- /** True iff the component has been started */
+ /** True iff the component has been started. */
private boolean started;
public SlotManager(
@@ -186,29 +186,19 @@ public class SlotManager implements AutoCloseable {
started = true;
- taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- mainThreadExecutor.execute(new Runnable() {
- @Override
- public void run() {
- checkTaskManagerTimeouts();
- }
- });
- }
- }, 0L, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
- slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- mainThreadExecutor.execute(new Runnable() {
- @Override
- public void run() {
- checkSlotRequestTimeouts();
- }
- });
- }
- }, 0L, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
+ () -> mainThreadExecutor.execute(
+ () -> checkTaskManagerTimeouts()),
+ 0L,
+ taskManagerTimeout.toMilliseconds(),
+ TimeUnit.MILLISECONDS);
+
+ slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
+ () -> mainThreadExecutor.execute(
+ () -> checkSlotRequestTimeouts()),
+ 0L,
+ slotRequestTimeout.toMilliseconds(),
+ TimeUnit.MILLISECONDS);
}
/**
@@ -388,6 +378,8 @@ public class SlotManager implements AutoCloseable {
public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) {
checkInit();
+ LOG.info("Received slot report from instance {}.", instanceId);
+
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
if (null != taskManagerRegistration) {
@@ -448,7 +440,7 @@ public class SlotManager implements AutoCloseable {
* Finds a matching slot request for a given resource profile. If there is no such request,
* the method returns null.
*
- * Note: If you want to change the behaviour of the slot manager wrt slot allocation and
+ * <p>Note: If you want to change the behaviour of the slot manager wrt slot allocation and
* request fulfillment, then you should override this method.
*
* @param slotResourceProfile defining the resources of an available slot
@@ -471,7 +463,7 @@ public class SlotManager implements AutoCloseable {
* resources available as the given resource profile. If there is no such slot available, then
* the method returns null.
*
- * Note: If you want to change the behaviour of the slot manager wrt slot allocation and
+ * <p>Note: If you want to change the behaviour of the slot manager wrt slot allocation and
* request fulfillment, then you should override this method.
*
* @param requestResourceProfile specifying the resource requirements for the a slot request
@@ -485,7 +477,10 @@ public class SlotManager implements AutoCloseable {
TaskManagerSlot taskManagerSlot = iterator.next().getValue();
// sanity check
- Preconditions.checkState(taskManagerSlot.getState() == TaskManagerSlot.State.FREE);
+ Preconditions.checkState(
+ taskManagerSlot.getState() == TaskManagerSlot.State.FREE,
+ "TaskManagerSlot %s is not in state FREE but %s.",
+ taskManagerSlot.getSlotId(), taskManagerSlot.getState());
if (taskManagerSlot.getResourceProfile().isMatching(requestResourceProfile)) {
iterator.remove();
@@ -595,6 +590,8 @@ public class SlotManager implements AutoCloseable {
}
break;
case FREE:
+ // the slot is currently free --> it is stored in freeSlots
+ freeSlots.remove(slot.getSlotId());
slot.updateAllocation(allocationId);
taskManagerRegistration.occupySlot();
break;
http://git-wip-us.apache.org/repos/asf/flink/blob/492d1b45/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 67a8cb9..4907756 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnect
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.testutils.category.Flip6;
@@ -53,10 +54,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -1070,6 +1074,66 @@ public class SlotManagerTest extends TestLogger {
}
}
+ /**
+ * Tests that free slots which are reported as allocated won't be considered for fulfilling
+ * other pending slot requests.
+ *
+ * <p>See: FLINK-8505
+ */
+ @Test
+ public void testReportAllocatedSlot() throws Exception {
+ final ResourceActions resourceActions = mock(ResourceActions.class);
+ final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway();
+ final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final ResourceID taskManagerId = ResourceID.generate();
+
+ try (final SlotManager slotManager = new SlotManager(
+ TestingUtils.defaultScheduledExecutor(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime())) {
+
+ slotManager.start(ResourceManagerId.generate(), Executors.directExecutor(), resourceActions);
+
+ // initially report a single slot as free
+ final SlotID slotId = new SlotID(taskManagerId, 0);
+ final SlotStatus initialSlotStatus = new SlotStatus(
+ slotId,
+ ResourceProfile.UNKNOWN);
+ final SlotReport initialSlotReport = new SlotReport(initialSlotStatus);
+
+ slotManager.registerTaskManager(taskExecutorConnection, initialSlotReport);
+
+ assertThat(slotManager.getNumberRegisteredSlots(), is(equalTo(1)));
+
+ // Now report this slot as allocated
+ final SlotStatus slotStatus = new SlotStatus(
+ slotId,
+ ResourceProfile.UNKNOWN,
+ new JobID(),
+ new AllocationID());
+ final SlotReport slotReport = new SlotReport(
+ slotStatus);
+
+ slotManager.reportSlotStatus(
+ taskExecutorConnection.getInstanceID(),
+ slotReport);
+
+ // this slot request should not be fulfilled
+ final AllocationID allocationId = new AllocationID();
+ final SlotRequest slotRequest = new SlotRequest(
+ new JobID(),
+ allocationId,
+ ResourceProfile.UNKNOWN,
+ "foobar");
+
+ // This triggered an IllegalStateException before
+ slotManager.registerSlotRequest(slotRequest);
+
+ assertThat(slotManager.getSlotRequest(allocationId).isAssigned(), is(false));
+ }
+ }
+
private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
SlotManager slotManager = new SlotManager(
TestingUtils.defaultScheduledExecutor(),