You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/18 14:34:44 UTC

[flink] branch release-1.5 updated (d2034ca -> acb8804)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from d2034ca  [FLINK-10222] [table] Fix parsing of keywords.
     new 5f14dac  [FLINK-9884] [runtime] fix slot request may not be removed when it has already be assigned in slot manager
     new acb8804  [hotfix] Fix checkstyle violations in SlotManagerTest

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../resourcemanager/slotmanager/SlotManager.java   |  3 +
 .../slotmanager/SlotManagerTest.java               | 79 ++++++++++++++++++++--
 2 files changed, 75 insertions(+), 7 deletions(-)


[flink] 01/02: [FLINK-9884] [runtime] fix slot request may not be removed when it has already be assigned in slot manager

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5f14dac2ee274b892b8a93541fd5a8ba054951d2
Author: shuai-xu <sh...@foxmail.com>
AuthorDate: Wed Jul 18 15:54:55 2018 +0800

    [FLINK-9884] [runtime] fix slot request may not be removed when it has already be assigned in slot manager
---
 .../resourcemanager/slotmanager/SlotManager.java   |  3 +
 .../slotmanager/SlotManagerTest.java               | 66 ++++++++++++++++++++++
 2 files changed, 69 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index d74979a..f207a36 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -592,6 +592,9 @@ public class SlotManager implements AutoCloseable {
 						// set the allocation id such that the slot won't be considered for the pending slot request
 						slot.updateAllocation(allocationId, jobId);
 
+						// remove the pending request if any as it has been assigned
+						pendingSlotRequests.remove(allocationId);
+
 						// this will try to find a new slot for the request
 						rejectPendingSlotRequest(
 							pendingSlotRequest,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index fb82aa5..cfca017 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -1217,6 +1218,71 @@ public class SlotManagerTest extends TestLogger {
 	}
 
 	/**
+	 * Tests that pending request is removed if task executor reports a slot with its allocation id.
+	 */
+	@Test
+	public void testSlotRequestRemovedIfTMReportAllocation() throws Exception {
+		try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(),
+				new TestingResourceActionsBuilder().createTestingResourceActions())) {
+
+			final JobID jobID = new JobID();
+			final SlotRequest slotRequest1 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
+			slotManager.registerSlotRequest(slotRequest1);
+
+			final BlockingQueue<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestSlotQueue = new ArrayBlockingQueue<>(1);
+			final BlockingQueue<CompletableFuture<Acknowledge>> responseQueue = new ArrayBlockingQueue<>(1);
+
+			final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+					.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> {
+						requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple5);
+						try {
+							return responseQueue.take();
+						} catch (InterruptedException ignored) {
+							return FutureUtils.completedExceptionally(new FlinkException("Response queue was interrupted."));
+						}
+					})
+					.createTestingTaskExecutorGateway();
+
+			final ResourceID taskExecutorResourceId = ResourceID.generate();
+			final TaskExecutorConnection taskExecutionConnection = new TaskExecutorConnection(taskExecutorResourceId, testingTaskExecutorGateway);
+			final SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorResourceId, 0), ResourceProfile.UNKNOWN));
+
+			final CompletableFuture<Acknowledge> firstManualSlotRequestResponse = new CompletableFuture<>();
+			responseQueue.offer(firstManualSlotRequestResponse);
+
+			slotManager.registerTaskManager(taskExecutionConnection, slotReport);
+
+			final Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId> firstRequest = requestSlotQueue.take();
+
+			final CompletableFuture<Acknowledge> secondManualSlotRequestResponse = new CompletableFuture<>();
+			responseQueue.offer(secondManualSlotRequestResponse);
+
+			final SlotRequest slotRequest2 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
+			slotManager.registerSlotRequest(slotRequest2);
+
+			// fail first request
+			firstManualSlotRequestResponse.completeExceptionally(new TimeoutException("Test exception to fail first allocation"));
+
+			final Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId> secondRequest = requestSlotQueue.take();
+
+			// fail second request
+			secondManualSlotRequestResponse.completeExceptionally(new SlotOccupiedException("Test exception", slotRequest1.getAllocationId(), jobID));
+
+			assertThat(firstRequest.f2, equalTo(slotRequest1.getAllocationId()));
+			assertThat(secondRequest.f2, equalTo(slotRequest2.getAllocationId()));
+			assertThat(secondRequest.f0, equalTo(firstRequest.f0));
+
+			secondManualSlotRequestResponse.complete(Acknowledge.get());
+
+			final TaskManagerSlot slot = slotManager.getSlot(secondRequest.f0);
+			assertThat(slot.getState(), equalTo(TaskManagerSlot.State.ALLOCATED));
+			assertThat(slot.getAllocationId(), equalTo(firstRequest.f2));
+
+			assertThat(slotManager.getNumberRegisteredSlots(), is(1));
+		}
+	}
+
+	/**
 	 * Tests notify the job manager of the allocations when the task manager is failed/killed.
 	 */
 	@Test


[flink] 02/02: [hotfix] Fix checkstyle violations in SlotManagerTest

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit acb88045ffd8caa68a22d2e7860715419208591e
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Sep 18 11:35:44 2018 +0200

    [hotfix] Fix checkstyle violations in SlotManagerTest
---
 .../resourcemanager/slotmanager/SlotManagerTest.java        | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index cfca017..8a7f733 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -119,7 +119,7 @@ public class SlotManagerTest extends TestLogger {
 		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 			slotManager.registerTaskManager(taskManagerConnection, slotReport);
 
-			assertTrue("The number registered slots does not equal the expected number.",2 == slotManager.getNumberRegisteredSlots());
+			assertTrue("The number registered slots does not equal the expected number.", 2 == slotManager.getNumberRegisteredSlots());
 
 			assertNotNull(slotManager.getSlot(slotId1));
 			assertNotNull(slotManager.getSlot(slotId2));
@@ -165,7 +165,7 @@ public class SlotManagerTest extends TestLogger {
 		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 			slotManager.registerTaskManager(taskManagerConnection, slotReport);
 
-			assertTrue("The number registered slots does not equal the expected number.",2 == slotManager.getNumberRegisteredSlots());
+			assertTrue("The number registered slots does not equal the expected number.", 2 == slotManager.getNumberRegisteredSlots());
 
 			TaskManagerSlot slot1 = slotManager.getSlot(slotId1);
 			TaskManagerSlot slot2 = slotManager.getSlot(slotId2);
@@ -190,7 +190,7 @@ public class SlotManagerTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that a slot request with no free slots will trigger the resource allocation
+	 * Tests that a slot request with no free slots will trigger the resource allocation.
 	 */
 	@Test
 	public void testSlotRequestWithoutFreeSlots() throws Exception {
@@ -393,7 +393,7 @@ public class SlotManagerTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that freeing a slot will correctly reset the slot and mark it as a free slot
+	 * Tests that freeing a slot will correctly reset the slot and mark it as a free slot.
 	 */
 	@Test
 	public void testFreeSlot() throws Exception {
@@ -633,7 +633,6 @@ public class SlotManagerTest extends TestLogger {
 		final SlotID slotId1 = new SlotID(resourceId, 0);
 		final SlotID slotId2 = new SlotID(resourceId, 1);
 
-
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
 		final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
 		final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
@@ -767,7 +766,7 @@ public class SlotManagerTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that a slot request is retried if it times out on the task manager side
+	 * Tests that a slot request is retried if it times out on the task manager side.
 	 */
 	@Test
 	@SuppressWarnings("unchecked")
@@ -1062,7 +1061,7 @@ public class SlotManagerTest extends TestLogger {
 	 * A timeout should only trigger the {@link ResourceActions#releaseResource(InstanceID, Exception)}
 	 * callback. The receiver of the callback can then decide what to do with the TaskManager.
 	 *
-	 * FLINK-7793
+	 * <p>See FLINK-7793
 	 */
 	@Test
 	public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {