You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/21 12:24:54 UTC

[GitHub] [flink] zhuzhurk opened a new pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

zhuzhurk opened a new pull request #12278:
URL: https://github.com/apache/flink/pull/12278


   ## What is the purpose of the change
   
   This PR introduces changes to fulfill slot requests in request order.
   This is to avoid slot competitions between slot allocation bulks which can lead to resource deadlocks.
   
   ## Brief change log
   
     - *Fulfill slot requests in request order on offer slots*
     - *Remap orphaned slot allocation to pending slot request which lost its allocation to allow the request to fail fast on allocation failures*
   
   ## Verifying this change
   
     - *modified SlotPoolRequestCompletionTest *
     - *Added SlotPoolPendingRequestFailureTest#testFailingAllocationFailsRemappedPendingSlotRequests*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011",
       "triggerID" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239",
       "triggerID" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3321",
       "triggerID" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3392",
       "triggerID" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13ae7d728e12b1f349d1cbb1d555882e5d29b298",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3605",
       "triggerID" : "13ae7d728e12b1f349d1cbb1d555882e5d29b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4f5b21017451ccae3811069e6534f3fa8aeaf1ad",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3685",
       "triggerID" : "4f5b21017451ccae3811069e6534f3fa8aeaf1ad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab11f37b0a46e1cf35434b55eb08bda539f16258",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ab11f37b0a46e1cf35434b55eb08bda539f16258",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 13ae7d728e12b1f349d1cbb1d555882e5d29b298 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3605) 
   * 4f5b21017451ccae3811069e6534f3fa8aeaf1ad Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3685) 
   * ab11f37b0a46e1cf35434b55eb08bda539f16258 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441577695



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##########
@@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() throws Exception {
 		}
 	}
 
+	@Test
+	public void testFailingAllocationFailsRemappedPendingSlotRequests() throws Exception {
+		final List<AllocationID> allocations = new ArrayList<>();
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocations.add(slotRequest.getAllocationId()));
+
+		try (SlotPoolImpl slotPool = setUpSlotPool()) {

Review comment:
       sure, I agree that  we can do the overall test cleanup as a separate issue if it is too much for this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441359953



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() throws Exception {
 		}
 	}
 
+	@Test
+	public void testOrphanedAllocationCanBeRemapped() throws Exception {
+		try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+			final List<AllocationID> allocationIds = new ArrayList<>();
+			resourceManagerGateway.setRequestSlotConsumer(
+				slotRequest -> allocationIds.add(slotRequest.getAllocationId()));
+
+			final List<AllocationID> canceledAllocations = new ArrayList<>();
+			resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
+
+			setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
+			final Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);

Review comment:
       why do we use `Scheduler` to unit test `SlotPoolImpl`?
   why not to call `SlotPoolImpl` directly, like in `testFailingAllocationFailsRemappedPendingSlotRequests`?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() throws Exception {
 		}
 	}
 
+	@Test
+	public void testOrphanedAllocationCanBeRemapped() throws Exception {
+		try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+			final List<AllocationID> allocationIds = new ArrayList<>();
+			resourceManagerGateway.setRequestSlotConsumer(
+				slotRequest -> allocationIds.add(slotRequest.getAllocationId()));
+
+			final List<AllocationID> canceledAllocations = new ArrayList<>();
+			resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
+
+			setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
+			final Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);
+
+			final SlotRequestId slotRequestId1 = new SlotRequestId();
+			scheduler.allocateSlot(
+				slotRequestId1,
+				new DummyScheduledUnit(),
+				SlotProfile.noRequirements(),
+				timeout);

Review comment:
       ```suggestion
   			allocateSlot(scheduler, slotRequestId1);
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() throws Exception {
 		}
 	}
 
+	@Test
+	public void testOrphanedAllocationCanBeRemapped() throws Exception {
+		try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+			final List<AllocationID> allocationIds = new ArrayList<>();
+			resourceManagerGateway.setRequestSlotConsumer(
+				slotRequest -> allocationIds.add(slotRequest.getAllocationId()));
+
+			final List<AllocationID> canceledAllocations = new ArrayList<>();
+			resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);

Review comment:
       nit: maybe, not now but if it can be reused also in other tests, it would be nice to have something like an RM harness:
   ```
   class RmHarness {
   final List<AllocationID> allocationIds = new ArrayList<>();
   final List<AllocationID> canceledAllocations = new ArrayList<>();
   RmHarness(resourceManagerGateway)
   getAllocations
   getCanceled
   }
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##########
@@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() throws Exception {
 		}
 	}
 
+	@Test
+	public void testFailingAllocationFailsRemappedPendingSlotRequests() throws Exception {
+		final List<AllocationID> allocations = new ArrayList<>();
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocations.add(slotRequest.getAllocationId()));
+
+		try (SlotPoolImpl slotPool = setUpSlotPool()) {

Review comment:
       Could we reuse/deduplicate `setUpSlotPool` also in `SlotPoolImplTest`?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() throws Exception {
 		}
 	}
 
+	@Test
+	public void testOrphanedAllocationCanBeRemapped() throws Exception {
+		try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+			final List<AllocationID> allocationIds = new ArrayList<>();
+			resourceManagerGateway.setRequestSlotConsumer(
+				slotRequest -> allocationIds.add(slotRequest.getAllocationId()));
+
+			final List<AllocationID> canceledAllocations = new ArrayList<>();
+			resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
+
+			setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
+			final Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);
+
+			final SlotRequestId slotRequestId1 = new SlotRequestId();
+			scheduler.allocateSlot(
+				slotRequestId1,
+				new DummyScheduledUnit(),
+				SlotProfile.noRequirements(),
+				timeout);

Review comment:
       There is already a private method `SlotPoolImplTest#allocateSlot` for this.
   I would also extend this private method to submit multiple allocations at once in order:
   `allocateSlot(Scheduler scheduler, SlotRequestId ... slotRequestIds)`

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() throws Exception {
 		}
 	}
 
+	@Test
+	public void testOrphanedAllocationCanBeRemapped() throws Exception {
+		try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+			final List<AllocationID> allocationIds = new ArrayList<>();
+			resourceManagerGateway.setRequestSlotConsumer(
+				slotRequest -> allocationIds.add(slotRequest.getAllocationId()));
+
+			final List<AllocationID> canceledAllocations = new ArrayList<>();
+			resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);

Review comment:
       it can be partially reused in `testFailingAllocationFailsRemappedPendingSlotRequests`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441400235



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##########
@@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() throws Exception {
 		}
 	}
 
+	@Test
+	public void testFailingAllocationFailsRemappedPendingSlotRequests() throws Exception {
+		final List<AllocationID> allocations = new ArrayList<>();
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocations.add(slotRequest.getAllocationId()));
+
+		try (SlotPoolImpl slotPool = setUpSlotPool()) {

Review comment:
       It's possible but unrelated. I do not feel like we should cleanup the tests here. 
   Like you said, the scheduler might also be not needed. I think it can be a larger task and would be better to be a separate task to fully rework the `SlotPoolImpl` tests, including those in `SlotPoolImplTest`, `SlotPoolInteractionsTest`, `SlotPoolSlotSharingTest`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk removed a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk removed a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-642479713


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441346884



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -37,13 +41,13 @@
 
 	private final LinkedHashMap<A, Tuple2<B, V>> aMap;
 
-	private final LinkedHashMap<B, A> bMap;
+	private final HashMap<B, A> bMap;

Review comment:
       Hmm, for me, the order would be not obvious anyways w/o either looking into the implementation or the jdoc comment. I am not strictly opposed to the change but I would avoid it if it is not strictly needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441430620



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() throws Exception {
 		}
 	}
 
+	@Test
+	public void testOrphanedAllocationCanBeRemapped() throws Exception {
+		try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+			final List<AllocationID> allocationIds = new ArrayList<>();
+			resourceManagerGateway.setRequestSlotConsumer(
+				slotRequest -> allocationIds.add(slotRequest.getAllocationId()));
+
+			final List<AllocationID> canceledAllocations = new ArrayList<>();
+			resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);

Review comment:
       Maybe later along with the rework of all SlotPool tests? Because resourceManagerGateway is shared between all cases and it's better to have an overview of the usages.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441343582



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -592,6 +589,31 @@ private PendingRequest findMatchingPendingRequest(final AllocatedSlot slot) {
 		return null;
 	}
 
+	private void maybeRemapOrphanedAllocation(
+			final AllocationID allocationIdOfRequest,
+			final AllocationID allocationIdOfSlot) {
+
+		final AllocationID orphanedAllocationId = allocationIdOfRequest.equals(allocationIdOfSlot)
+			? null : allocationIdOfRequest;
+
+		// if the request that initiated the allocation is still pending, it should take over the orphaned allocation
+		// of the fulfilled request so that it can fail fast if the remapped allocation fails
+		if (orphanedAllocationId != null) {
+			final SlotRequestId requestIdOfAllocatedSlot = pendingRequests.getKeyA(allocationIdOfSlot);
+			if (requestIdOfAllocatedSlot != null) {
+				final PendingRequest requestOfAllocatedSlot = pendingRequests.getByKeyA(requestIdOfAllocatedSlot);
+				requestOfAllocatedSlot.setAllocationId(orphanedAllocationId);
+
+				// this re-insertion of initiatedRequestId will not affect its original insertion order
+				pendingRequests.put(requestIdOfAllocatedSlot, orphanedAllocationId, requestOfAllocatedSlot);
+			} else {
+				// cancel the slot request if the orphaned allocation is not remapped to a pending request.
+				// the request id can be null if the slot is returned by scheduler
+				resourceManagerGateway.cancelSlotRequest(orphanedAllocationId);

Review comment:
       Alright, so this is something like release the slot at the RM at the same time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r435362084



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -648,26 +648,8 @@ boolean offerSlot(
 			slotOffer.getResourceProfile(),
 			taskManagerGateway);
 
-		// check whether we have request waiting for this slot
-		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);

Review comment:
       There is this `UnfulfillableSlotRequestException` which is still a fail fast route if RM finds that a certain request profile cannot be fulfilled at all with any existing slot and cannot be allocated. It is relevant for both batch and streaming and bulk as I see. I do not know the whole background of this. At first glance, it seems to be a complication but it is probably necessary to avoid timeout waiting to cancel everything if it is already clear that allocation can never succeed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk removed a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk removed a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-633531282


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011",
       "triggerID" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239",
       "triggerID" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e193166a23d99c7c43c123472a1b23534c383ec8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk merged pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk merged pull request #12278:
URL: https://github.com/apache/flink/pull/12278


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a3a4256ccc6300fcac27d7625461c078a1510604 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-645745013


   Thanks for the reviewing @azagrebin .
   The same test failure happened again that VM crashes with exit code 239. It is an exisiting issue FLINK-18290 that is not related to this PR. So I would start merging it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011",
       "triggerID" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239",
       "triggerID" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3321",
       "triggerID" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3392",
       "triggerID" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13ae7d728e12b1f349d1cbb1d555882e5d29b298",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "13ae7d728e12b1f349d1cbb1d555882e5d29b298",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9f69cd18655a8256d5bc91720a4a88f54b9a6ef9 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3392) 
   * 13ae7d728e12b1f349d1cbb1d555882e5d29b298 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-642606221


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r440768678



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -37,13 +41,13 @@
 
 	private final LinkedHashMap<A, Tuple2<B, V>> aMap;
 
-	private final LinkedHashMap<B, A> bMap;
+	private final HashMap<B, A> bMap;

Review comment:
       It might cause confusion so I think it would be better to not make it a `LinkedHashMap`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-642479713


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r436628282



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -112,6 +114,9 @@
 	/** The requests that are waiting for the resource manager to be connected. */
 	private final LinkedHashMap<SlotRequestId, PendingRequest> waitingForResourceManager;
 
+	/** Maps a request to its allocation. */
+	private final BiMap<SlotRequestId, AllocationID> requestedAllocations;

Review comment:
       Yes we can improve `DualKeyLinkedMap` and drop the `requestedAllocations`. 
   The needed improvement would be:
   1. re-insert a record does not affect its order
   2. get keyA from keyB, and vice versa

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -112,6 +114,9 @@
 	/** The requests that are waiting for the resource manager to be connected. */
 	private final LinkedHashMap<SlotRequestId, PendingRequest> waitingForResourceManager;
 
+	/** Maps a request to its allocation. */
+	private final BiMap<SlotRequestId, AllocationID> requestedAllocations;

Review comment:
       done.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -37,20 +44,20 @@
 
 	private final LinkedHashMap<A, Tuple2<B, V>> aMap;
 
-	private final LinkedHashMap<B, A> bMap;
+	private final HashMap<B, A> bMap;
 
 	private transient Collection<V> values;
 
 	public DualKeyLinkedMap(int initialCapacity) {

Review comment:
       @azagrebin what do you think of making this class package private?
   It looks like a common util class but no one other than `SlotPoolImpl` uses it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011",
       "triggerID" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239",
       "triggerID" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3321",
       "triggerID" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3392",
       "triggerID" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9f69cd18655a8256d5bc91720a4a88f54b9a6ef9 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3392) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441412744



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##########
@@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() throws Exception {
 		}
 	}
 
+	@Test
+	public void testFailingAllocationFailsRemappedPendingSlotRequests() throws Exception {
+		final List<AllocationID> allocations = new ArrayList<>();
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocations.add(slotRequest.getAllocationId()));
+
+		try (SlotPoolImpl slotPool = setUpSlotPool()) {

Review comment:
       I can open a task for it if you feel it is Ok.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r436540565



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -648,26 +648,8 @@ boolean offerSlot(
 			slotOffer.getResourceProfile(),
 			taskManagerGateway);
 
-		// check whether we have request waiting for this slot
-		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);

Review comment:
       I had another thought and prefers to not make it configurable to fulfill request in request order.
   The FIFO order works for any scheduling strategy and is even a improvement for lazy from source scheduling. There was a ticket for the same purpose "[FLINK-13165] Complete slot requests in request order" although it did not fully make it at last. `LinkedHashMap` was introduced by it and eases this PR. `SlotPoolRequestCompletionTest` was also introduced by it and can be reused.
   This PR is more alike a fix to FLINK-13165.
   
   The orphaned allocation remapping also ensures that the fail-fast route will not break with this change.
   
   What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011",
       "triggerID" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239",
       "triggerID" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3321",
       "triggerID" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7a3940a82c063f6bf74cbc72715e77034afe2492 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3321) 
   * 9f69cd18655a8256d5bc91720a4a88f54b9a6ef9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011",
       "triggerID" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239",
       "triggerID" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3321",
       "triggerID" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e193166a23d99c7c43c123472a1b23534c383ec8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239) 
   * 7a3940a82c063f6bf74cbc72715e77034afe2492 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3321) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r433114179



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -698,13 +680,13 @@ boolean offerSlot(
 
 		componentMainThreadExecutor.assertRunningInMainThread();
 
-		final PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
+		final PendingRequest pendingRequest = pendingRequests.getKeyB(allocationID);

Review comment:
       Sure we can have a separate PR to centralize pending requests removal.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011",
       "triggerID" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239",
       "triggerID" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3321",
       "triggerID" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3392",
       "triggerID" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13ae7d728e12b1f349d1cbb1d555882e5d29b298",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3605",
       "triggerID" : "13ae7d728e12b1f349d1cbb1d555882e5d29b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4f5b21017451ccae3811069e6534f3fa8aeaf1ad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3685",
       "triggerID" : "4f5b21017451ccae3811069e6534f3fa8aeaf1ad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab11f37b0a46e1cf35434b55eb08bda539f16258",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3697",
       "triggerID" : "ab11f37b0a46e1cf35434b55eb08bda539f16258",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab11f37b0a46e1cf35434b55eb08bda539f16258 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3697) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441336004



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -592,6 +589,31 @@ private PendingRequest findMatchingPendingRequest(final AllocatedSlot slot) {
 		return null;
 	}
 
+	private void maybeRemapOrphanedAllocation(
+			final AllocationID allocationIdOfRequest,
+			final AllocationID allocationIdOfSlot) {
+
+		final AllocationID orphanedAllocationId = allocationIdOfRequest.equals(allocationIdOfSlot)
+			? null : allocationIdOfRequest;
+
+		// if the request that initiated the allocation is still pending, it should take over the orphaned allocation
+		// of the fulfilled request so that it can fail fast if the remapped allocation fails
+		if (orphanedAllocationId != null) {
+			final SlotRequestId requestIdOfAllocatedSlot = pendingRequests.getKeyA(allocationIdOfSlot);
+			if (requestIdOfAllocatedSlot != null) {
+				final PendingRequest requestOfAllocatedSlot = pendingRequests.getByKeyA(requestIdOfAllocatedSlot);
+				requestOfAllocatedSlot.setAllocationId(orphanedAllocationId);
+
+				// this re-insertion of initiatedRequestId will not affect its original insertion order
+				pendingRequests.put(requestIdOfAllocatedSlot, orphanedAllocationId, requestOfAllocatedSlot);
+			} else {
+				// cancel the slot request if the orphaned allocation is not remapped to a pending request.
+				// the request id can be null if the slot is returned by scheduler
+				resourceManagerGateway.cancelSlotRequest(orphanedAllocationId);

Review comment:
       yes. If such returned slot fulfills a pending request, the orphaned allocation would not be needed by any other pending requests which still have their own allocations.
   So we can safely cancel them to avoid allocating slots more than needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441412492



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##########
@@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() throws Exception {
 		}
 	}
 
+	@Test
+	public void testFailingAllocationFailsRemappedPendingSlotRequests() throws Exception {
+		final List<AllocationID> allocations = new ArrayList<>();
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocations.add(slotRequest.getAllocationId()));
+
+		try (SlotPoolImpl slotPool = setUpSlotPool()) {

Review comment:
       Possibly these test classes can share the same test base.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r433113663



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRequestCompletionTest.java
##########
@@ -103,7 +123,7 @@ private void runSlotRequestCompletionTest(
 			// check that the slot requests get completed in sequential order
 			for (int i = 0; i < slotRequestIds.size(); i++) {
 				final CompletableFuture<PhysicalSlot> slotRequestFuture = slotRequests.get(i);
-				slotRequestFuture.get();
+				assertThat(slotRequestFuture.getNow(null), not(is(nullValue())));

Review comment:
       OK.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441316261



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMapTest.java
##########
@@ -85,4 +85,28 @@ public void ensuresOneToOneMappingBetweenKeysSameSecondaryKey() {
 		assertThat(map.getByKeyB(1), is(secondValue));
 		assertThat(map.getByKeyA(2), is(secondValue));
 	}
+
+	@Test
+	public void testPrimaryKeyOrderIsNotAffectedIfReInsertedWithSameSecondaryKey() {
+		final DualKeyLinkedMap<Integer, Integer, String> map = new DualKeyLinkedMap<>(2);
+
+		final String value = "foobar";
+		map.put(1, 1, value);
+		map.put(2, 2, value);
+
+		map.put(1, 1, value);
+		assertThat(map.keySetA().iterator().next(), is(1));
+	}
+
+	@Test
+	public void testPrimaryKeyOrderIsNotAffectedIfReInsertedWithDifferentSecondaryKey() {
+		final DualKeyLinkedMap<Integer, Integer, String> map = new DualKeyLinkedMap<>(2);
+
+		final String value = "foobar";
+		map.put(1, 1, value);
+		map.put(2, 2, value);
+
+		map.put(1, 3, value);

Review comment:
       This is tested in `ensuresOneToOneMappingBetweenKeysSameSecondaryKey`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011",
       "triggerID" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239",
       "triggerID" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3321",
       "triggerID" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3392",
       "triggerID" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13ae7d728e12b1f349d1cbb1d555882e5d29b298",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3605",
       "triggerID" : "13ae7d728e12b1f349d1cbb1d555882e5d29b298",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 13ae7d728e12b1f349d1cbb1d555882e5d29b298 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3605) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r430441131



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -112,6 +114,9 @@
 	/** The requests that are waiting for the resource manager to be connected. */
 	private final LinkedHashMap<SlotRequestId, PendingRequest> waitingForResourceManager;
 
+	/** Maps a request to its allocation. */
+	private final BiMap<SlotRequestId, AllocationID> requestedAllocations;

Review comment:
       Looking into the implementation of `DualKeyLinkedMap` for `pendingRequests`, it seems we can just remove the first matching `SlotRequestId` and then remap the orphaned `SlotRequestId` to its `AllocationID`. The original insertion ordering should not suffer in `DualKeyLinkedMap.aMap`. If so, we could remove  `requestedAllocations`.
   
   EDIT: `waitingForResourceManager` -> `pendingRequests`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011",
       "triggerID" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239",
       "triggerID" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3321",
       "triggerID" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3392",
       "triggerID" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13ae7d728e12b1f349d1cbb1d555882e5d29b298",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3605",
       "triggerID" : "13ae7d728e12b1f349d1cbb1d555882e5d29b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4f5b21017451ccae3811069e6534f3fa8aeaf1ad",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3685",
       "triggerID" : "4f5b21017451ccae3811069e6534f3fa8aeaf1ad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab11f37b0a46e1cf35434b55eb08bda539f16258",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3697",
       "triggerID" : "ab11f37b0a46e1cf35434b55eb08bda539f16258",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 13ae7d728e12b1f349d1cbb1d555882e5d29b298 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3605) 
   * 4f5b21017451ccae3811069e6534f3fa8aeaf1ad Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3685) 
   * ab11f37b0a46e1cf35434b55eb08bda539f16258 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3697) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011",
       "triggerID" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239",
       "triggerID" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3321",
       "triggerID" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3392",
       "triggerID" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13ae7d728e12b1f349d1cbb1d555882e5d29b298",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3605",
       "triggerID" : "13ae7d728e12b1f349d1cbb1d555882e5d29b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4f5b21017451ccae3811069e6534f3fa8aeaf1ad",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3685",
       "triggerID" : "4f5b21017451ccae3811069e6534f3fa8aeaf1ad",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 13ae7d728e12b1f349d1cbb1d555882e5d29b298 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3605) 
   * 4f5b21017451ccae3811069e6534f3fa8aeaf1ad Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3685) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441347449



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -592,6 +589,31 @@ private PendingRequest findMatchingPendingRequest(final AllocatedSlot slot) {
 		return null;
 	}
 
+	private void maybeRemapOrphanedAllocation(
+			final AllocationID allocationIdOfRequest,
+			final AllocationID allocationIdOfSlot) {
+
+		final AllocationID orphanedAllocationId = allocationIdOfRequest.equals(allocationIdOfSlot)
+			? null : allocationIdOfRequest;
+
+		// if the request that initiated the allocation is still pending, it should take over the orphaned allocation
+		// of the fulfilled request so that it can fail fast if the remapped allocation fails
+		if (orphanedAllocationId != null) {
+			final SlotRequestId requestIdOfAllocatedSlot = pendingRequests.getKeyA(allocationIdOfSlot);
+			if (requestIdOfAllocatedSlot != null) {
+				final PendingRequest requestOfAllocatedSlot = pendingRequests.getByKeyA(requestIdOfAllocatedSlot);
+				requestOfAllocatedSlot.setAllocationId(orphanedAllocationId);
+
+				// this re-insertion of initiatedRequestId will not affect its original insertion order
+				pendingRequests.put(requestIdOfAllocatedSlot, orphanedAllocationId, requestOfAllocatedSlot);
+			} else {
+				// cancel the slot request if the orphaned allocation is not remapped to a pending request.
+				// the request id can be null if the slot is returned by scheduler
+				resourceManagerGateway.cancelSlotRequest(orphanedAllocationId);

Review comment:
       updated the comments to make it easier to understand




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] GJL commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
GJL commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r430919148



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRequestCompletionTest.java
##########
@@ -103,7 +123,7 @@ private void runSlotRequestCompletionTest(
 			// check that the slot requests get completed in sequential order
 			for (int i = 0; i < slotRequestIds.size(); i++) {
 				final CompletableFuture<PhysicalSlot> slotRequestFuture = slotRequests.get(i);
-				slotRequestFuture.get();
+				assertThat(slotRequestFuture.getNow(null), not(is(nullValue())));

Review comment:
       `is(not(nullValue))` reads nicer

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -698,13 +680,13 @@ boolean offerSlot(
 
 		componentMainThreadExecutor.assertRunningInMainThread();
 
-		final PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
+		final PendingRequest pendingRequest = pendingRequests.getKeyB(allocationID);

Review comment:
       It looks like that this should have been in a separate commit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011",
       "triggerID" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239",
       "triggerID" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3321",
       "triggerID" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3392",
       "triggerID" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7a3940a82c063f6bf74cbc72715e77034afe2492 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3321) 
   * 9f69cd18655a8256d5bc91720a4a88f54b9a6ef9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3392) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011",
       "triggerID" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239",
       "triggerID" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9a0102b7e3b412027bb84b16cf79f85e930ceb81 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011) 
   * e193166a23d99c7c43c123472a1b23534c383ec8 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r433607269



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -648,26 +648,8 @@ boolean offerSlot(
 			slotOffer.getResourceProfile(),
 			taskManagerGateway);
 
-		// check whether we have request waiting for this slot
-		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);

Review comment:
       I think it's a good idea to make it configurable. 
   
   Besides the benefit to reduce risk for streaming and DataSet jobs, another benefit is that we can also drop the change to remap orphaned allocations. This is because the remapping is for fail-fast of pending requests in `failAllocation(...)` which makes difference only if it is a streaming job.
   
   Actually I'm thinking whether we need to keep the fail-fast mechanism in `failAllocation(...)` in the future. It requires the slot pool to differentiate streaming requests and batch requests. And in the future if a slotpool contains both batch slots(occupied temporarily) and streaming slots(occupied indefinitely), a failed allocation for streaming request does not need to fail immediately if it is still fulfillable, just like how we currently deal with batch requests.
   
   What do you think of dropping the commit to remap orphaned allocations?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r436540565



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -648,26 +648,8 @@ boolean offerSlot(
 			slotOffer.getResourceProfile(),
 			taskManagerGateway);
 
-		// check whether we have request waiting for this slot
-		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);

Review comment:
       I had another thought and prefer to not make it configurable to fulfill request in request order.
   The FIFO order works for any scheduling strategy and is even a improvement for lazy from source scheduling. There was a ticket for the same purpose "[FLINK-13165] Complete slot requests in request order" although it did not fully make it at last. `LinkedHashMap` was introduced by it and eases this PR. `SlotPoolRequestCompletionTest` was also introduced by it and can be reused.
   This PR is more alike a fix to FLINK-13165.
   
   The orphaned allocation remapping also ensures that the fail-fast route will not break with this change.
   
   What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011",
       "triggerID" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239",
       "triggerID" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3321",
       "triggerID" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3392",
       "triggerID" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13ae7d728e12b1f349d1cbb1d555882e5d29b298",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3605",
       "triggerID" : "13ae7d728e12b1f349d1cbb1d555882e5d29b298",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9f69cd18655a8256d5bc91720a4a88f54b9a6ef9 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3392) 
   * 13ae7d728e12b1f349d1cbb1d555882e5d29b298 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3605) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-633531282


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-641975174


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632058001


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1 (Thu May 21 12:27:00 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011",
       "triggerID" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239",
       "triggerID" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3321",
       "triggerID" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7a3940a82c063f6bf74cbc72715e77034afe2492 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3321) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r440767961



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -22,12 +22,16 @@
 
 import java.util.AbstractCollection;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Set;
 
 /**
- * Map which stores values under two different indices.
+ * Map which stores values under two different indices. The mapping of the primary key to the
+ * value is backed by {@link LinkedHashMap} so that the iteration order over the values and
+ * the primary key set is the insertion order. Note that there is no contract of the iteration
+ * order over the secondary key set.

Review comment:
       Good suggestion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441347521



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMapTest.java
##########
@@ -85,4 +85,28 @@ public void ensuresOneToOneMappingBetweenKeysSameSecondaryKey() {
 		assertThat(map.getByKeyB(1), is(secondValue));
 		assertThat(map.getByKeyA(2), is(secondValue));
 	}
+
+	@Test
+	public void testPrimaryKeyOrderIsNotAffectedIfReInsertedWithSameSecondaryKey() {
+		final DualKeyLinkedMap<Integer, Integer, String> map = new DualKeyLinkedMap<>(2);
+
+		final String value = "foobar";
+		map.put(1, 1, value);
+		map.put(2, 2, value);
+
+		map.put(1, 1, value);
+		assertThat(map.keySetA().iterator().next(), is(1));

Review comment:
       True. Added verifications for values.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -74,6 +74,20 @@ public V getKeyB(B bKey) {
 		}
 	}
 
+	public A getKeyA(B bKey) {
+		return bMap.get(bKey);
+	}
+
+	public B getKeyB(A aKey) {

Review comment:
       done.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -74,6 +74,20 @@ public V getKeyB(B bKey) {
 		}
 	}
 
+	public A getKeyA(B bKey) {

Review comment:
       done.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -64,7 +64,7 @@ public V getKeyA(A aKey) {
 		}
 	}
 
-	public V getKeyB(B bKey) {
+	public V getByKeyB(B bKey) {

Review comment:
       done.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -54,7 +54,7 @@ public int size() {
 		return aMap.size();
 	}
 
-	public V getKeyA(A aKey) {
+	public V getByKeyA(A aKey) {

Review comment:
       done.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -22,12 +22,16 @@
 
 import java.util.AbstractCollection;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Set;
 
 /**
- * Map which stores values under two different indices.
+ * Map which stores values under two different indices. The mapping of the primary key to the
+ * value is backed by {@link LinkedHashMap} so that the iteration order over the values and
+ * the primary key set is the insertion order. Note that there is no contract of the iteration
+ * order over the secondary key set.

Review comment:
       done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r435362084



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -648,26 +648,8 @@ boolean offerSlot(
 			slotOffer.getResourceProfile(),
 			taskManagerGateway);
 
-		// check whether we have request waiting for this slot
-		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);

Review comment:
       There is this `UnfulfillableSlotRequestException` which is still a fail fast route if RM finds that a certain request profile cannot be fulfilled at all with any existing slot and cannot be allocated. It is relevant for both batch and streaming and bulk as I see. I do not know the whole background of this. At first glance, it seems to be a complication but it is probably necessary to avoid timeout waiting to cancel everything if it is already clear that allocation cannot succeed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997) 
   * a3a4256ccc6300fcac27d7625461c078a1510604 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r436533148



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -648,26 +648,8 @@ boolean offerSlot(
 			slotOffer.getResourceProfile(),
 			taskManagerGateway);
 
-		// check whether we have request waiting for this slot
-		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);

Review comment:
       Yes still need the remapping to allow fail-fast on `UnfulfillableSlotRequestException`.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441432435



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() throws Exception {
 		}
 	}
 
+	@Test
+	public void testOrphanedAllocationCanBeRemapped() throws Exception {
+		try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+			final List<AllocationID> allocationIds = new ArrayList<>();
+			resourceManagerGateway.setRequestSlotConsumer(
+				slotRequest -> allocationIds.add(slotRequest.getAllocationId()));
+
+			final List<AllocationID> canceledAllocations = new ArrayList<>();
+			resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
+
+			setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
+			final Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);
+
+			final SlotRequestId slotRequestId1 = new SlotRequestId();
+			scheduler.allocateSlot(
+				slotRequestId1,
+				new DummyScheduledUnit(),
+				SlotProfile.noRequirements(),
+				timeout);

Review comment:
       Done via introducing a `requestNewAllocatedSlots(SlotPool, SlotRequestId...)`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441347160



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -592,6 +589,31 @@ private PendingRequest findMatchingPendingRequest(final AllocatedSlot slot) {
 		return null;
 	}
 
+	private void maybeRemapOrphanedAllocation(
+			final AllocationID allocationIdOfRequest,
+			final AllocationID allocationIdOfSlot) {
+
+		final AllocationID orphanedAllocationId = allocationIdOfRequest.equals(allocationIdOfSlot)
+			? null : allocationIdOfRequest;
+
+		// if the request that initiated the allocation is still pending, it should take over the orphaned allocation
+		// of the fulfilled request so that it can fail fast if the remapped allocation fails
+		if (orphanedAllocationId != null) {

Review comment:
       done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r438824119



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -648,26 +648,8 @@ boolean offerSlot(
 			slotOffer.getResourceProfile(),
 			taskManagerGateway);
 
-		// check whether we have request waiting for this slot
-		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);

Review comment:
       Ok, the FLINK-13165 makes slot requests to be completed in order only if the offers come with unknown `AllocationIds`, right? Generally we expect that RM keeps the `AllocationId` to match `SlotRequestID`. I am fine to break the tie `SlotRequestId->AllocationID` if there is no known consequences. Eventually, I hope it might even help to simplify the `SlotPoolImpl`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk removed a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk removed a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-642606221


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk removed a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk removed a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-641975174


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441431939



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() throws Exception {
 		}
 	}
 
+	@Test
+	public void testOrphanedAllocationCanBeRemapped() throws Exception {
+		try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+			final List<AllocationID> allocationIds = new ArrayList<>();
+			resourceManagerGateway.setRequestSlotConsumer(
+				slotRequest -> allocationIds.add(slotRequest.getAllocationId()));
+
+			final List<AllocationID> canceledAllocations = new ArrayList<>();
+			resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
+
+			setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
+			final Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);

Review comment:
       You are right. I have dropped scheduler in the newly added cases. 
   However, this unnecessary complication is a common problem of most of the SlotPool tests. I think we can simplify them in a separate task as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997) 
   * a3a4256ccc6300fcac27d7625461c078a1510604 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r438835441



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -37,20 +44,20 @@
 
 	private final LinkedHashMap<A, Tuple2<B, V>> aMap;
 
-	private final LinkedHashMap<B, A> bMap;
+	private final HashMap<B, A> bMap;
 
 	private transient Collection<V> values;
 
 	public DualKeyLinkedMap(int initialCapacity) {

Review comment:
       Makes sense to me




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011",
       "triggerID" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9a0102b7e3b412027bb84b16cf79f85e930ceb81 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011) 
   * e193166a23d99c7c43c123472a1b23534c383ec8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r435362084



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -648,26 +648,8 @@ boolean offerSlot(
 			slotOffer.getResourceProfile(),
 			taskManagerGateway);
 
-		// check whether we have request waiting for this slot
-		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);

Review comment:
       There is this `UnfulfillableSlotRequestException` which is still a fail fast route if RM finds that a certain request profile cannot be fulfilled at all with any existing slot and cannot be allocated. It is relevant for both batch and streaming and bulk as I see. I do not know the whole background of this. At first glance, it seems to be a complication but it might be necessary.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r430441131



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -112,6 +114,9 @@
 	/** The requests that are waiting for the resource manager to be connected. */
 	private final LinkedHashMap<SlotRequestId, PendingRequest> waitingForResourceManager;
 
+	/** Maps a request to its allocation. */
+	private final BiMap<SlotRequestId, AllocationID> requestedAllocations;

Review comment:
       Looking into the implementation of `DualKeyLinkedMap` for `waitingForResourceManager`, it seems we can just remove the first matching `SlotRequestId` and then remap the orphaned `SlotRequestId` to its `AllocationID`. The original insertion ordering should not suffer in `DualKeyLinkedMap.aMap`. If so, we could remove  `requestedAllocations`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -648,26 +648,8 @@ boolean offerSlot(
 			slotOffer.getResourceProfile(),
 			taskManagerGateway);
 
-		// check whether we have request waiting for this slot
-		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);

Review comment:
       I am not sure about all consequences of this change for the existing scheduling. I mean that we do not respect SlotRequestId->AllocationID by accepting the slot offer. Would it make sense to keep this behaviour configurable for now depending on scheduling strategy? Or this complication is not needed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441351136



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -37,13 +41,13 @@
 
 	private final LinkedHashMap<A, Tuple2<B, V>> aMap;
 
-	private final LinkedHashMap<B, A> bMap;
+	private final HashMap<B, A> bMap;

Review comment:
       I means confusion to developers. A `LinkedHashMap` for bMap indicates it makes some differences than `Map` but actually not.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r440693739



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -22,12 +22,16 @@
 
 import java.util.AbstractCollection;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Set;
 
 /**
- * Map which stores values under two different indices.
+ * Map which stores values under two different indices. The mapping of the primary key to the
+ * value is backed by {@link LinkedHashMap} so that the iteration order over the values and
+ * the primary key set is the insertion order. Note that there is no contract of the iteration
+ * order over the secondary key set.

Review comment:
       I would also explicitly document that `@param <A>` is the primary key.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -37,13 +41,13 @@
 
 	private final LinkedHashMap<A, Tuple2<B, V>> aMap;
 
-	private final LinkedHashMap<B, A> bMap;
+	private final HashMap<B, A> bMap;

Review comment:
       Does it matter for this PR which type `bMap` has?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMapTest.java
##########
@@ -85,4 +85,28 @@ public void ensuresOneToOneMappingBetweenKeysSameSecondaryKey() {
 		assertThat(map.getByKeyB(1), is(secondValue));
 		assertThat(map.getByKeyA(2), is(secondValue));
 	}
+
+	@Test
+	public void testPrimaryKeyOrderIsNotAffectedIfReInsertedWithSameSecondaryKey() {
+		final DualKeyLinkedMap<Integer, Integer, String> map = new DualKeyLinkedMap<>(2);
+
+		final String value = "foobar";
+		map.put(1, 1, value);
+		map.put(2, 2, value);
+
+		map.put(1, 1, value);
+		assertThat(map.keySetA().iterator().next(), is(1));

Review comment:
       I think `values()` are also interesting to check for both added tests.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -74,6 +74,20 @@ public V getKeyB(B bKey) {
 		}
 	}
 
+	public A getKeyA(B bKey) {

Review comment:
       ```suggestion
   	public A getKeyAByKeyB(B bKey) {
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -64,7 +64,7 @@ public V getKeyA(A aKey) {
 		}
 	}
 
-	public V getKeyB(B bKey) {
+	public V getByKeyB(B bKey) {

Review comment:
       ```suggestion
   	public V getValueByKeyB(B bKey) {
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -54,7 +54,7 @@ public int size() {
 		return aMap.size();
 	}
 
-	public V getKeyA(A aKey) {
+	public V getByKeyA(A aKey) {

Review comment:
       ```suggestion
   	public V getValueByKeyA(A aKey) {
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMapTest.java
##########
@@ -85,4 +85,28 @@ public void ensuresOneToOneMappingBetweenKeysSameSecondaryKey() {
 		assertThat(map.getByKeyB(1), is(secondValue));
 		assertThat(map.getByKeyA(2), is(secondValue));
 	}
+
+	@Test
+	public void testPrimaryKeyOrderIsNotAffectedIfReInsertedWithSameSecondaryKey() {
+		final DualKeyLinkedMap<Integer, Integer, String> map = new DualKeyLinkedMap<>(2);
+
+		final String value = "foobar";
+		map.put(1, 1, value);
+		map.put(2, 2, value);
+
+		map.put(1, 1, value);
+		assertThat(map.keySetA().iterator().next(), is(1));
+	}
+
+	@Test
+	public void testPrimaryKeyOrderIsNotAffectedIfReInsertedWithDifferentSecondaryKey() {
+		final DualKeyLinkedMap<Integer, Integer, String> map = new DualKeyLinkedMap<>(2);
+
+		final String value = "foobar";
+		map.put(1, 1, value);
+		map.put(2, 2, value);
+
+		map.put(1, 3, value);

Review comment:
       do we also want to check cleanup of key B `3` if it were in the map?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -592,6 +589,31 @@ private PendingRequest findMatchingPendingRequest(final AllocatedSlot slot) {
 		return null;
 	}
 
+	private void maybeRemapOrphanedAllocation(
+			final AllocationID allocationIdOfRequest,
+			final AllocationID allocationIdOfSlot) {
+
+		final AllocationID orphanedAllocationId = allocationIdOfRequest.equals(allocationIdOfSlot)
+			? null : allocationIdOfRequest;
+
+		// if the request that initiated the allocation is still pending, it should take over the orphaned allocation
+		// of the fulfilled request so that it can fail fast if the remapped allocation fails
+		if (orphanedAllocationId != null) {

Review comment:
       ```suggestion
   		// if the request that initiated the allocation is still pending, it should take over the orphaned allocation
   		// of the fulfilled request so that it can fail fast if the remapped allocation fails
   		if (!allocationIdOfRequest.equals(allocationIdOfSlot)) {
   		    final AllocationID orphanedAllocationId = allocationIdOfRequest;
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -74,6 +74,20 @@ public V getKeyB(B bKey) {
 		}
 	}
 
+	public A getKeyA(B bKey) {
+		return bMap.get(bKey);
+	}
+
+	public B getKeyB(A aKey) {

Review comment:
       ```suggestion
   	public B getKeyBByKeyA(A aKey) {
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -592,6 +589,31 @@ private PendingRequest findMatchingPendingRequest(final AllocatedSlot slot) {
 		return null;
 	}
 
+	private void maybeRemapOrphanedAllocation(
+			final AllocationID allocationIdOfRequest,
+			final AllocationID allocationIdOfSlot) {
+
+		final AllocationID orphanedAllocationId = allocationIdOfRequest.equals(allocationIdOfSlot)
+			? null : allocationIdOfRequest;
+
+		// if the request that initiated the allocation is still pending, it should take over the orphaned allocation
+		// of the fulfilled request so that it can fail fast if the remapped allocation fails
+		if (orphanedAllocationId != null) {
+			final SlotRequestId requestIdOfAllocatedSlot = pendingRequests.getKeyA(allocationIdOfSlot);
+			if (requestIdOfAllocatedSlot != null) {
+				final PendingRequest requestOfAllocatedSlot = pendingRequests.getByKeyA(requestIdOfAllocatedSlot);
+				requestOfAllocatedSlot.setAllocationId(orphanedAllocationId);
+
+				// this re-insertion of initiatedRequestId will not affect its original insertion order
+				pendingRequests.put(requestIdOfAllocatedSlot, orphanedAllocationId, requestOfAllocatedSlot);
+			} else {
+				// cancel the slot request if the orphaned allocation is not remapped to a pending request.
+				// the request id can be null if the slot is returned by scheduler
+				resourceManagerGateway.cancelSlotRequest(orphanedAllocationId);

Review comment:
       is it about previously allocated slots returned by scheduler after e.g. finishing tasks?
   do they also have to be cancelled in the RM?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r436540565



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -648,26 +648,8 @@ boolean offerSlot(
 			slotOffer.getResourceProfile(),
 			taskManagerGateway);
 
-		// check whether we have request waiting for this slot
-		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);

Review comment:
       I had another thought and prefer to not make it configurable to fulfill request in request order.
   The FIFO order works for any scheduling strategy and is even a improvement for lazy from source scheduling. It is not just for pipelined region scheduling.
   There once was a ticket/PR for the same purpose "[FLINK-13165] Complete slot requests in request order" although it did not fully make it at last. `LinkedHashMap` was introduced by it and eases this PR. `SlotPoolRequestCompletionTest` was also introduced by it and can be reused.
   
   The orphaned allocation remapping also ensures that the fail-fast route will not break with this change.
   
   What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011",
       "triggerID" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239",
       "triggerID" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3321",
       "triggerID" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3392",
       "triggerID" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13ae7d728e12b1f349d1cbb1d555882e5d29b298",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3605",
       "triggerID" : "13ae7d728e12b1f349d1cbb1d555882e5d29b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4f5b21017451ccae3811069e6534f3fa8aeaf1ad",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3685",
       "triggerID" : "4f5b21017451ccae3811069e6534f3fa8aeaf1ad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab11f37b0a46e1cf35434b55eb08bda539f16258",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3697",
       "triggerID" : "ab11f37b0a46e1cf35434b55eb08bda539f16258",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4f5b21017451ccae3811069e6534f3fa8aeaf1ad Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3685) 
   * ab11f37b0a46e1cf35434b55eb08bda539f16258 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3697) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011",
       "triggerID" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239",
       "triggerID" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e193166a23d99c7c43c123472a1b23534c383ec8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239) 
   * 7a3940a82c063f6bf74cbc72715e77034afe2492 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441952039



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##########
@@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() throws Exception {
 		}
 	}
 
+	@Test
+	public void testFailingAllocationFailsRemappedPendingSlotRequests() throws Exception {
+		final List<AllocationID> allocations = new ArrayList<>();
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocations.add(slotRequest.getAllocationId()));
+
+		try (SlotPoolImpl slotPool = setUpSlotPool()) {

Review comment:
       FLINK-18355 is opened for simplify the tests.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##########
@@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() throws Exception {
 		}
 	}
 
+	@Test
+	public void testFailingAllocationFailsRemappedPendingSlotRequests() throws Exception {
+		final List<AllocationID> allocations = new ArrayList<>();
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocations.add(slotRequest.getAllocationId()));
+
+		try (SlotPoolImpl slotPool = setUpSlotPool()) {

Review comment:
       FLINK-18355 is opened to simplify the tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r438843898



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -648,26 +648,8 @@ boolean offerSlot(
 			slotOffer.getResourceProfile(),
 			taskManagerGateway);
 
-		// check whether we have request waiting for this slot
-		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);

Review comment:
       I think it should be fine to remap `SlotRequestIds` and `AllocationIds`. In the end it is an implementation detail of the `SlotPoolImpl` how a `SlotRequestId` is mapped to an `AllocationId` and no user of the `SlotPool` should rely on/use it. The only important bit is to assign the orphaned `AllocationId` so that we can react to signals from the `ResourceManager`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-641215406


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk removed a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk removed a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-641215406


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r435362084



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -648,26 +648,8 @@ boolean offerSlot(
 			slotOffer.getResourceProfile(),
 			taskManagerGateway);
 
-		// check whether we have request waiting for this slot
-		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);

Review comment:
       There is this `UnfulfillableSlotRequestException` which is still a fail fast route if RM finds that a certain request profile cannot be fulfilled at all with any existing slot and cannot be allocated. It is relevant for both batch and streaming and bulk as I see. I do not know the whole background of this. At first glance, this looks to me as an optimisation that complicates things a bit at the moment. It is probably necessary to avoid timeout waiting to cancel everything if it is already clear that allocation can never succeed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441364374



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -37,13 +41,13 @@
 
 	private final LinkedHashMap<A, Tuple2<B, V>> aMap;
 
-	private final LinkedHashMap<B, A> bMap;
+	private final HashMap<B, A> bMap;

Review comment:
       Alright




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2115",
       "triggerID" : "a3a4256ccc6300fcac27d7625461c078a1510604",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3011",
       "triggerID" : "9a0102b7e3b412027bb84b16cf79f85e930ceb81",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3239",
       "triggerID" : "e193166a23d99c7c43c123472a1b23534c383ec8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3321",
       "triggerID" : "7a3940a82c063f6bf74cbc72715e77034afe2492",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3392",
       "triggerID" : "9f69cd18655a8256d5bc91720a4a88f54b9a6ef9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "13ae7d728e12b1f349d1cbb1d555882e5d29b298",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3605",
       "triggerID" : "13ae7d728e12b1f349d1cbb1d555882e5d29b298",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4f5b21017451ccae3811069e6534f3fa8aeaf1ad",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4f5b21017451ccae3811069e6534f3fa8aeaf1ad",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 13ae7d728e12b1f349d1cbb1d555882e5d29b298 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3605) 
   * 4f5b21017451ccae3811069e6534f3fa8aeaf1ad UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12278:
URL: https://github.com/apache/flink/pull/12278#issuecomment-632060550


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997",
       "triggerID" : "30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 30dbfc08b3a4147d18dc00aebb2e31773f1fdbc1 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1997) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r439243225



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -648,26 +648,8 @@ boolean offerSlot(
 			slotOffer.getResourceProfile(),
 			taskManagerGateway);
 
-		// check whether we have request waiting for this slot
-		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);

Review comment:
       Thanks for confirming it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441412492



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##########
@@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() throws Exception {
 		}
 	}
 
+	@Test
+	public void testFailingAllocationFailsRemappedPendingSlotRequests() throws Exception {
+		final List<AllocationID> allocations = new ArrayList<>();
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocations.add(slotRequest.getAllocationId()));
+
+		try (SlotPoolImpl slotPool = setUpSlotPool()) {

Review comment:
       Possibly these test classes can share the same test base. So that the util methods and fields can be reused.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org