You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/30 09:49:59 UTC

[6/7] flink git commit: [FLINK-8505] [flip6] Prevent SlotManager from reaching an inconsistent state

[FLINK-8505] [flip6] Prevent SlotManager from reaching an inconsistent state

The SlotManager could reach an inconsistent state when a formerly free slot is reported
to be allocated by an incoming SlotReport. This state transition did not remove the slot
from the set of free slots. As a consequence, a now allocated slot will be considered for
future SlotRequests. This caused a failure with an IllegalStateException.

The problem is solved by removing an updated slot which is now allocated from the set of
free slots.

This closes #5354.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/492d1b45
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/492d1b45
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/492d1b45

Branch: refs/heads/master
Commit: 492d1b45c283c6611added2b1c750bb2b014d23c
Parents: 03d042a
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jan 24 18:32:47 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 29 17:05:57 2018 +0100

----------------------------------------------------------------------
 .../clusterframework/types/TaskManagerSlot.java | 13 ++--
 .../slotmanager/SlotManager.java                | 77 ++++++++++----------
 .../slotmanager/SlotManagerTest.java            | 64 ++++++++++++++++
 3 files changed, 109 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/492d1b45/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
index f92f9ac..fb7fce3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
@@ -33,19 +33,19 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class TaskManagerSlot {
 
-	/** The unique identification of this slot */
+	/** The unique identification of this slot. */
 	private final SlotID slotId;
 
-	/** The resource profile of this slot */
+	/** The resource profile of this slot. */
 	private final ResourceProfile resourceProfile;
 
-	/** Gateway to the TaskExecutor which owns the slot */
+	/** Gateway to the TaskExecutor which owns the slot. */
 	private final TaskExecutorConnection taskManagerConnection;
 
-	/** Allocation id for which this slot has been allocated */
+	/** Allocation id for which this slot has been allocated. */
 	private AllocationID allocationId;
 
-	/** Assigned slot request if there is currently an ongoing request */
+	/** Assigned slot request if there is currently an ongoing request. */
 	private PendingSlotRequest assignedSlotRequest;
 
 	private State state;
@@ -139,6 +139,9 @@ public class TaskManagerSlot {
 		return resourceProfile.isMatching(required);
 	}
 
+	/**
+	 * State of the {@link TaskManagerSlot}.
+	 */
 	public enum State {
 		FREE,
 		PENDING,

http://git-wip-us.apache.org/repos/asf/flink/blob/492d1b45/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 7326719..1e6f810 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -63,54 +63,54 @@ import java.util.concurrent.TimeoutException;
  * are not enough slots available the slot manager will notify the resource manager about it via
  * {@link ResourceActions#allocateResource(ResourceProfile)}.
  *
- * In order to free resources and avoid resource leaks, idling task managers (task managers whose
+ * <p>In order to free resources and avoid resource leaks, idling task managers (task managers whose
  * slots are currently not used) and pending slot requests time out triggering their release and
  * failure, respectively.
  */
 public class SlotManager implements AutoCloseable {
 	private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);
 
-	/** Scheduled executor for timeouts */
+	/** Scheduled executor for timeouts. */
 	private final ScheduledExecutor scheduledExecutor;
 
-	/** Timeout for slot requests to the task manager */
+	/** Timeout for slot requests to the task manager. */
 	private final Time taskManagerRequestTimeout;
 
-	/** Timeout after which an allocation is discarded */
+	/** Timeout after which an allocation is discarded. */
 	private final Time slotRequestTimeout;
 
-	/** Timeout after which an unused TaskManager is released */
+	/** Timeout after which an unused TaskManager is released. */
 	private final Time taskManagerTimeout;
 
-	/** Map for all registered slots */
+	/** Map for all registered slots. */
 	private final HashMap<SlotID, TaskManagerSlot> slots;
 
-	/** Index of all currently free slots */
+	/** Index of all currently free slots. */
 	private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;
 
-	/** All currently registered task managers */
+	/** All currently registered task managers. */
 	private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;
 
-	/** Map of fulfilled and active allocations for request deduplication purposes */
+	/** Map of fulfilled and active allocations for request deduplication purposes. */
 	private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;
 
-	/** Map of pending/unfulfilled slot allocation requests */
+	/** Map of pending/unfulfilled slot allocation requests. */
 	private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;
 
-	/** ResourceManager's id */
+	/** ResourceManager's id. */
 	private ResourceManagerId resourceManagerId;
 
-	/** Executor for future callbacks which have to be "synchronized" */
+	/** Executor for future callbacks which have to be "synchronized". */
 	private Executor mainThreadExecutor;
 
-	/** Callbacks for resource (de-)allocations */
+	/** Callbacks for resource (de-)allocations. */
 	private ResourceActions resourceActions;
 
 	private ScheduledFuture<?> taskManagerTimeoutCheck;
 
 	private ScheduledFuture<?> slotRequestTimeoutCheck;
 
-	/** True iff the component has been started */
+	/** True iff the component has been started. */
 	private boolean started;
 
 	public SlotManager(
@@ -186,29 +186,19 @@ public class SlotManager implements AutoCloseable {
 
 		started = true;
 
-		taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
-			@Override
-			public void run() {
-				mainThreadExecutor.execute(new Runnable() {
-					@Override
-					public void run() {
-						checkTaskManagerTimeouts();
-					}
-				});
-			}
-		}, 0L, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
-		slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
-			@Override
-			public void run() {
-				mainThreadExecutor.execute(new Runnable() {
-					@Override
-					public void run() {
-						checkSlotRequestTimeouts();
-					}
-				});
-			}
-		}, 0L, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+		taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
+			() -> mainThreadExecutor.execute(
+				() -> checkTaskManagerTimeouts()),
+			0L,
+			taskManagerTimeout.toMilliseconds(),
+			TimeUnit.MILLISECONDS);
+
+		slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
+			() -> mainThreadExecutor.execute(
+				() -> checkSlotRequestTimeouts()),
+			0L,
+			slotRequestTimeout.toMilliseconds(),
+			TimeUnit.MILLISECONDS);
 	}
 
 	/**
@@ -388,6 +378,8 @@ public class SlotManager implements AutoCloseable {
 	public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) {
 		checkInit();
 
+		LOG.info("Received slot report from instance {}.", instanceId);
+
 		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
 
 		if (null != taskManagerRegistration) {
@@ -448,7 +440,7 @@ public class SlotManager implements AutoCloseable {
 	 * Finds a matching slot request for a given resource profile. If there is no such request,
 	 * the method returns null.
 	 *
-	 * Note: If you want to change the behaviour of the slot manager wrt slot allocation and
+	 * <p>Note: If you want to change the behaviour of the slot manager wrt slot allocation and
 	 * request fulfillment, then you should override this method.
 	 *
 	 * @param slotResourceProfile defining the resources of an available slot
@@ -471,7 +463,7 @@ public class SlotManager implements AutoCloseable {
 	 * resources available as the given resource profile. If there is no such slot available, then
 	 * the method returns null.
 	 *
-	 * Note: If you want to change the behaviour of the slot manager wrt slot allocation and
+	 * <p>Note: If you want to change the behaviour of the slot manager wrt slot allocation and
 	 * request fulfillment, then you should override this method.
 	 *
 	 * @param requestResourceProfile specifying the resource requirements for the a slot request
@@ -485,7 +477,10 @@ public class SlotManager implements AutoCloseable {
 			TaskManagerSlot taskManagerSlot = iterator.next().getValue();
 
 			// sanity check
-			Preconditions.checkState(taskManagerSlot.getState() == TaskManagerSlot.State.FREE);
+			Preconditions.checkState(
+				taskManagerSlot.getState() == TaskManagerSlot.State.FREE,
+				"TaskManagerSlot %s is not in state FREE but %s.",
+				taskManagerSlot.getSlotId(), taskManagerSlot.getState());
 
 			if (taskManagerSlot.getResourceProfile().isMatching(requestResourceProfile)) {
 				iterator.remove();
@@ -595,6 +590,8 @@ public class SlotManager implements AutoCloseable {
 					}
 					break;
 				case FREE:
+					// the slot is currently free --> it is stored in freeSlots
+					freeSlots.remove(slot.getSlotId());
 					slot.updateAllocation(allocationId);
 					taskManagerRegistration.occupySlot();
 					break;

http://git-wip-us.apache.org/repos/asf/flink/blob/492d1b45/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 67a8cb9..4907756 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnect
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.testutils.category.Flip6;
@@ -53,10 +54,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -1070,6 +1074,66 @@ public class SlotManagerTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that free slots which are reported as allocated won't be considered for fulfilling
+	 * other pending slot requests.
+	 *
+	 * <p>See: FLINK-8505
+	 */
+	@Test
+	public void testReportAllocatedSlot() throws Exception {
+		final ResourceActions resourceActions = mock(ResourceActions.class);
+		final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway();
+		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final ResourceID taskManagerId = ResourceID.generate();
+
+		try (final SlotManager slotManager = new SlotManager(
+			TestingUtils.defaultScheduledExecutor(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime())) {
+
+			slotManager.start(ResourceManagerId.generate(), Executors.directExecutor(), resourceActions);
+
+			// initially report a single slot as free
+			final SlotID slotId = new SlotID(taskManagerId, 0);
+			final SlotStatus initialSlotStatus = new SlotStatus(
+				slotId,
+				ResourceProfile.UNKNOWN);
+			final SlotReport initialSlotReport = new SlotReport(initialSlotStatus);
+
+			slotManager.registerTaskManager(taskExecutorConnection, initialSlotReport);
+
+			assertThat(slotManager.getNumberRegisteredSlots(), is(equalTo(1)));
+
+			// Now report this slot as allocated
+			final SlotStatus slotStatus = new SlotStatus(
+				slotId,
+				ResourceProfile.UNKNOWN,
+				new JobID(),
+				new AllocationID());
+			final SlotReport slotReport = new SlotReport(
+				slotStatus);
+
+			slotManager.reportSlotStatus(
+				taskExecutorConnection.getInstanceID(),
+				slotReport);
+
+			// this slot request should not be fulfilled
+			final AllocationID allocationId = new AllocationID();
+			final SlotRequest slotRequest = new SlotRequest(
+				new JobID(),
+				allocationId,
+				ResourceProfile.UNKNOWN,
+				"foobar");
+
+			// This triggered an IllegalStateException before
+			slotManager.registerSlotRequest(slotRequest);
+
+			assertThat(slotManager.getSlotRequest(allocationId).isAssigned(), is(false));
+		}
+	}
+
 	private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
 		SlotManager slotManager = new SlotManager(
 			TestingUtils.defaultScheduledExecutor(),