You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/18 14:32:53 UTC

[flink] branch master updated (09abba3 -> 8d842e3)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 09abba3  [hotfix] Fix checkstyle violations in SlotManager
     new 48e724f  [FLINK-9884] [runtime] fix slot request may not be removed when it has already be assigned in slot manager
     new 8d842e3  [hotfix] Fix checkstyle violations in SlotManagerTest

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../resourcemanager/slotmanager/SlotManager.java   |  3 +
 .../slotmanager/SlotManagerTest.java               | 79 ++++++++++++++++++++--
 2 files changed, 75 insertions(+), 7 deletions(-)


[flink] 02/02: [hotfix] Fix checkstyle violations in SlotManagerTest

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8d842e39d4b2ba3a196cb508b4637e3432fc8efc
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Sep 18 11:35:44 2018 +0200

    [hotfix] Fix checkstyle violations in SlotManagerTest
---
 .../resourcemanager/slotmanager/SlotManagerTest.java        | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index cfca017..8a7f733 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -119,7 +119,7 @@ public class SlotManagerTest extends TestLogger {
 		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 			slotManager.registerTaskManager(taskManagerConnection, slotReport);
 
-			assertTrue("The number registered slots does not equal the expected number.",2 == slotManager.getNumberRegisteredSlots());
+			assertTrue("The number registered slots does not equal the expected number.", 2 == slotManager.getNumberRegisteredSlots());
 
 			assertNotNull(slotManager.getSlot(slotId1));
 			assertNotNull(slotManager.getSlot(slotId2));
@@ -165,7 +165,7 @@ public class SlotManagerTest extends TestLogger {
 		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 			slotManager.registerTaskManager(taskManagerConnection, slotReport);
 
-			assertTrue("The number registered slots does not equal the expected number.",2 == slotManager.getNumberRegisteredSlots());
+			assertTrue("The number registered slots does not equal the expected number.", 2 == slotManager.getNumberRegisteredSlots());
 
 			TaskManagerSlot slot1 = slotManager.getSlot(slotId1);
 			TaskManagerSlot slot2 = slotManager.getSlot(slotId2);
@@ -190,7 +190,7 @@ public class SlotManagerTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that a slot request with no free slots will trigger the resource allocation
+	 * Tests that a slot request with no free slots will trigger the resource allocation.
 	 */
 	@Test
 	public void testSlotRequestWithoutFreeSlots() throws Exception {
@@ -393,7 +393,7 @@ public class SlotManagerTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that freeing a slot will correctly reset the slot and mark it as a free slot
+	 * Tests that freeing a slot will correctly reset the slot and mark it as a free slot.
 	 */
 	@Test
 	public void testFreeSlot() throws Exception {
@@ -633,7 +633,6 @@ public class SlotManagerTest extends TestLogger {
 		final SlotID slotId1 = new SlotID(resourceId, 0);
 		final SlotID slotId2 = new SlotID(resourceId, 1);
 
-
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
 		final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
 		final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
@@ -767,7 +766,7 @@ public class SlotManagerTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that a slot request is retried if it times out on the task manager side
+	 * Tests that a slot request is retried if it times out on the task manager side.
 	 */
 	@Test
 	@SuppressWarnings("unchecked")
@@ -1062,7 +1061,7 @@ public class SlotManagerTest extends TestLogger {
 	 * A timeout should only trigger the {@link ResourceActions#releaseResource(InstanceID, Exception)}
 	 * callback. The receiver of the callback can then decide what to do with the TaskManager.
 	 *
-	 * FLINK-7793
+	 * <p>See FLINK-7793
 	 */
 	@Test
 	public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {


[flink] 01/02: [FLINK-9884] [runtime] fix slot request may not be removed when it has already be assigned in slot manager

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 48e724fed60d10a11e3cc39e9c6e964002a926b3
Author: shuai-xu <sh...@foxmail.com>
AuthorDate: Wed Jul 18 15:54:55 2018 +0800

    [FLINK-9884] [runtime] fix slot request may not be removed when it has already be assigned in slot manager
    
    This closes #6360.
---
 .../resourcemanager/slotmanager/SlotManager.java   |  3 +
 .../slotmanager/SlotManagerTest.java               | 66 ++++++++++++++++++++++
 2 files changed, 69 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index f9c8c0e..d54d143 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -594,6 +594,9 @@ public class SlotManager implements AutoCloseable {
 						// set the allocation id such that the slot won't be considered for the pending slot request
 						slot.updateAllocation(allocationId, jobId);
 
+						// remove the pending request if any as it has been assigned
+						pendingSlotRequests.remove(allocationId);
+
 						// this will try to find a new slot for the request
 						rejectPendingSlotRequest(
 							pendingSlotRequest,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index fb82aa5..cfca017 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -1217,6 +1218,71 @@ public class SlotManagerTest extends TestLogger {
 	}
 
 	/**
+	 * Tests that pending request is removed if task executor reports a slot with its allocation id.
+	 */
+	@Test
+	public void testSlotRequestRemovedIfTMReportAllocation() throws Exception {
+		try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(),
+				new TestingResourceActionsBuilder().createTestingResourceActions())) {
+
+			final JobID jobID = new JobID();
+			final SlotRequest slotRequest1 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
+			slotManager.registerSlotRequest(slotRequest1);
+
+			final BlockingQueue<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestSlotQueue = new ArrayBlockingQueue<>(1);
+			final BlockingQueue<CompletableFuture<Acknowledge>> responseQueue = new ArrayBlockingQueue<>(1);
+
+			final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+					.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> {
+						requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple5);
+						try {
+							return responseQueue.take();
+						} catch (InterruptedException ignored) {
+							return FutureUtils.completedExceptionally(new FlinkException("Response queue was interrupted."));
+						}
+					})
+					.createTestingTaskExecutorGateway();
+
+			final ResourceID taskExecutorResourceId = ResourceID.generate();
+			final TaskExecutorConnection taskExecutionConnection = new TaskExecutorConnection(taskExecutorResourceId, testingTaskExecutorGateway);
+			final SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorResourceId, 0), ResourceProfile.UNKNOWN));
+
+			final CompletableFuture<Acknowledge> firstManualSlotRequestResponse = new CompletableFuture<>();
+			responseQueue.offer(firstManualSlotRequestResponse);
+
+			slotManager.registerTaskManager(taskExecutionConnection, slotReport);
+
+			final Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId> firstRequest = requestSlotQueue.take();
+
+			final CompletableFuture<Acknowledge> secondManualSlotRequestResponse = new CompletableFuture<>();
+			responseQueue.offer(secondManualSlotRequestResponse);
+
+			final SlotRequest slotRequest2 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
+			slotManager.registerSlotRequest(slotRequest2);
+
+			// fail first request
+			firstManualSlotRequestResponse.completeExceptionally(new TimeoutException("Test exception to fail first allocation"));
+
+			final Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId> secondRequest = requestSlotQueue.take();
+
+			// fail second request
+			secondManualSlotRequestResponse.completeExceptionally(new SlotOccupiedException("Test exception", slotRequest1.getAllocationId(), jobID));
+
+			assertThat(firstRequest.f2, equalTo(slotRequest1.getAllocationId()));
+			assertThat(secondRequest.f2, equalTo(slotRequest2.getAllocationId()));
+			assertThat(secondRequest.f0, equalTo(firstRequest.f0));
+
+			secondManualSlotRequestResponse.complete(Acknowledge.get());
+
+			final TaskManagerSlot slot = slotManager.getSlot(secondRequest.f0);
+			assertThat(slot.getState(), equalTo(TaskManagerSlot.State.ALLOCATED));
+			assertThat(slot.getAllocationId(), equalTo(firstRequest.f2));
+
+			assertThat(slotManager.getNumberRegisteredSlots(), is(1));
+		}
+	}
+
+	/**
 	 * Tests notify the job manager of the allocations when the task manager is failed/killed.
 	 */
 	@Test