You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/12/22 14:41:34 UTC

[GitHub] [flink] XComp opened a new pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

XComp opened a new pull request #14465:
URL: https://github.com/apache/flink/pull/14465


   ## What is the purpose of the change
   
   The purpose of this PR is to remove `DefaultExecutionSlotAllocator` and `DefaultExecutionSlotAllocatorFactory` and replace it with `SlotSharingExecutionSlotAllocator` and its factory class.
   
   ## Brief change log
   
   * TODO
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no)
     - The serializers: (yes / no / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / no / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know)
     - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 23d374243c9726e72ec4b1e1e87739739cded4b9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378",
       "triggerID" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11384",
       "triggerID" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 23d374243c9726e72ec4b1e1e87739739cded4b9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342) 
   * bcf71e17c9821db76fa474ad0316abefc5b1cfec UNKNOWN
   * 5dfaefedb78d18a8cddda978cf121eacdb481789 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378) 
   * 6898634d33237f76e7197479432fd7840f3a8a83 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11384) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378",
       "triggerID" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11384",
       "triggerID" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bcf71e17c9821db76fa474ad0316abefc5b1cfec UNKNOWN
   * 6898634d33237f76e7197479432fd7840f3a8a83 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11384) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r547348834



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
##########
@@ -179,10 +176,10 @@ private SchedulerBase createScheduler(
 			final JobVertex... vertices) throws Exception {
 
 		final JobGraph jobGraph = new JobGraph(new JobID(), "test job", vertices);
-		final SlotProvider slotProvider = new TestingSlotProvider(
-			ignored -> CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot()));
-		final SchedulerBase scheduler = SchedulerTestingUtils
-			.newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, slotProvider, Time.seconds(10))
+		final SchedulerBase scheduler = SchedulerTestingUtils.newSchedulerBuilder(jobGraph)
+			.setExecutionSlotAllocatorFactory(SchedulerTestingUtils.slotSharingExecutionSlotAllocatorFactoryBuilder()

Review comment:
       Can't this be the default? Or is it the custom allocation timeout we are setting here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378",
       "triggerID" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11384",
       "triggerID" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "triggerType" : "PUSH"
     }, {
       "hash" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11458",
       "triggerID" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f71cb168af01508ae680a5353e8c159015275f42",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11463",
       "triggerID" : "f71cb168af01508ae680a5353e8c159015275f42",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bcf71e17c9821db76fa474ad0316abefc5b1cfec UNKNOWN
   * 353160767d25e932e5f650a335574ca2eb9a88e1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11458) 
   * f71cb168af01508ae680a5353e8c159015275f42 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11463) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378",
       "triggerID" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11384",
       "triggerID" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bcf71e17c9821db76fa474ad0316abefc5b1cfec UNKNOWN
   * 5dfaefedb78d18a8cddda978cf121eacdb481789 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378) 
   * 6898634d33237f76e7197479432fd7840f3a8a83 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11384) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r549684014



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
##########
@@ -136,6 +136,16 @@ public static DefaultSchedulerBuilder newSchedulerBuilderWithSlotSharingExecutio
 		return newSchedulerBuilderWithSlotSharingExecutionSlotAllocator(
 			jobGraph,
 			new TestingPhysicalSlotProvider(SlotSharingExecutionSlotAllocatorTest.PhysicalSlotFutureCompletionMode.SUCCESS),
+			allocationTimeout);
+	}
+
+	public static DefaultSchedulerBuilder newSchedulerBuilderWithSlotSharingExecutionSlotAllocator(

Review comment:
       No, a builder is better. I think the comment originates from going through the commits in creation order and at this point in time there wasn't a builder created yet.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
XComp commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r549693989



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -56,8 +57,11 @@
 	private final CountDownLatch allocateLatch;
 	private volatile int availableSlotCount;
 
-	private TestingPhysicalSlotProvider(SlotSharingExecutionSlotAllocatorTest.PhysicalSlotFutureCompletionMode physicalSlotFutureCompletion, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, int availableSlotCount) {
+	private final QuinConsumer<CompletableFuture<TestingPhysicalSlot>, AllocationID, TaskManagerLocation, TaskManagerGateway, ResourceProfile> physicalSlotCreator;

Review comment:
       I refactored it to use a `TriFunctionWithException` returning a `CompletableFuture` now. I decided to return `CompletableFuture` instances as there are also use-cases where the slot creation is handled within the test itself.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
XComp commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r549776451



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -69,71 +63,43 @@ private TestingPhysicalSlotProvider(SlotSharingExecutionSlotAllocatorTest.Physic
 		this.requests = new HashMap<>();
 		this.responses = new HashMap<>();
 		this.cancellations = new HashMap<>();
-
-		this.availableSlotCount = availableSlotCount;
-		this.allocateLatch = new CountDownLatch(availableSlotCount);
 	}
 
 	@Override
 	public CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(PhysicalSlotRequest physicalSlotRequest) {
 		SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId();
 		requests.put(slotRequestId, physicalSlotRequest);
-		CompletableFuture<TestingPhysicalSlot> resultFuture = new CompletableFuture<>();
-		responses.put(slotRequestId, resultFuture);
-
-		switch (physicalSlotFutureCompletion) {
-			case SUCCESS:
-				if (availableSlotCount > 0) {
-					completePhysicalSlotFutureFor(slotRequestId, new AllocationID());
-				} else {
-					resultFuture.completeExceptionally(new NoResourceAvailableException());
-				}
-				break;
-			case FAILURE:
-				resultFuture.completeExceptionally(new FlinkException("Test failure."));
-				break;
+		CompletableFuture<TestingPhysicalSlot> resultFuture;
+		try {
+			resultFuture = physicalSlotCreator.apply(
+				taskManagerLocation,
+				taskManagerGateway,
+				physicalSlotRequest.getSlotProfile().getPhysicalSlotResourceProfile());
+		} catch (Throwable t) {
+			resultFuture = FutureUtils.completedExceptionally(t);

Review comment:
       Yeah, you're right. I should have used the `TriFunction` right away.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
XComp commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r549678847



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -104,6 +107,7 @@ void completePhysicalSlotFutureFor(
 				resourceProfile);
 			responses.get(slotRequestId).complete(physicalSlot);
 			availableSlotCount--;
+			allocateLatch.countDown();

Review comment:
       Yes, it was actually used in `ExecutionTest` to wait for all requests to be finished. I refactored it as we have all the future available, anyway.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2147c4b63bfdf589e2c5ab167b0c41cacc7a514c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 23d374243c9726e72ec4b1e1e87739739cded4b9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342) 
   * bcf71e17c9821db76fa474ad0316abefc5b1cfec UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
XComp commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r549324817



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
##########
@@ -179,10 +176,10 @@ private SchedulerBase createScheduler(
 			final JobVertex... vertices) throws Exception {
 
 		final JobGraph jobGraph = new JobGraph(new JobID(), "test job", vertices);
-		final SlotProvider slotProvider = new TestingSlotProvider(
-			ignored -> CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot()));
-		final SchedulerBase scheduler = SchedulerTestingUtils
-			.newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, slotProvider, Time.seconds(10))
+		final SchedulerBase scheduler = SchedulerTestingUtils.newSchedulerBuilder(jobGraph)
+			.setExecutionSlotAllocatorFactory(SchedulerTestingUtils.slotSharingExecutionSlotAllocatorFactoryBuilder()

Review comment:
       Yes, I want to consider the custom allocation time here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378",
       "triggerID" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 23d374243c9726e72ec4b1e1e87739739cded4b9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342) 
   * bcf71e17c9821db76fa474ad0316abefc5b1cfec UNKNOWN
   * 5dfaefedb78d18a8cddda978cf121eacdb481789 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378",
       "triggerID" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11384",
       "triggerID" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "triggerType" : "PUSH"
     }, {
       "hash" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11458",
       "triggerID" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f71cb168af01508ae680a5353e8c159015275f42",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11463",
       "triggerID" : "f71cb168af01508ae680a5353e8c159015275f42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "51dad931454e096b565e35daf9342209679271bf",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "51dad931454e096b565e35daf9342209679271bf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bcf71e17c9821db76fa474ad0316abefc5b1cfec UNKNOWN
   * 353160767d25e932e5f650a335574ca2eb9a88e1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11458) 
   * f71cb168af01508ae680a5353e8c159015275f42 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11463) 
   * 51dad931454e096b565e35daf9342209679271bf UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
XComp edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-751693135


   > I think the refactoring looks good to me. One thing I was wondering whether we can make the configuration of the `SlotSharingExecutionSlotAllocator` a bit easier or whether the nested builder is good enough for it.
   
   I counted 6 different parameter combinations which would need to be covered
   - `()` (i.e. no parameters for customization)
   - `(physicalSlotProvider, allocationTimeout)`
   - `(slotSharingStrategyFactory)`
   - `(availableSlotCount:int)`
   - `(physicalSlotProvider)`
   - `(allocationTimeout)`
   
   Hence, I considered the builder being easier to maintain in `SchedulerTestingUtils`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
XComp commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r549797538



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.TriFunctionWithException;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A {@link PhysicalSlotProvider} implementation that can be used in tests.
+ */
+public class TestingPhysicalSlotProvider implements PhysicalSlotProvider {
+
+	private final Map<SlotRequestId, PhysicalSlotRequest> requests;
+	private final Map<SlotRequestId, CompletableFuture<TestingPhysicalSlot>> responses;
+	private final Map<SlotRequestId, Throwable> cancellations;
+
+	private final TaskManagerLocation taskManagerLocation;
+	private final TaskManagerGateway taskManagerGateway;
+
+	private final TriFunctionWithException<TaskManagerLocation, TaskManagerGateway, ResourceProfile, CompletableFuture<TestingPhysicalSlot>, Throwable> physicalSlotCreator;
+
+	private TestingPhysicalSlotProvider(TriFunctionWithException<TaskManagerLocation, TaskManagerGateway, ResourceProfile, CompletableFuture<TestingPhysicalSlot>, Throwable> physicalSlotCreator, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway) {
+		this.physicalSlotCreator = physicalSlotCreator;
+
+		this.taskManagerLocation = taskManagerLocation;
+		this.taskManagerGateway = taskManagerGateway;
+
+		this.requests = new HashMap<>();
+		this.responses = new HashMap<>();
+		this.cancellations = new HashMap<>();
+	}
+
+	@Override
+	public CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(PhysicalSlotRequest physicalSlotRequest) {
+		SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId();
+		requests.put(slotRequestId, physicalSlotRequest);
+		CompletableFuture<TestingPhysicalSlot> resultFuture;
+		try {
+			resultFuture = physicalSlotCreator.apply(
+				taskManagerLocation,
+				taskManagerGateway,
+				physicalSlotRequest.getSlotProfile().getPhysicalSlotResourceProfile());

Review comment:
       Now I get it. Thanks for clarification. I also removed the Builder as part of this refactoring since we only have the creator function left as a parameter.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r549766548



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -69,71 +63,43 @@ private TestingPhysicalSlotProvider(SlotSharingExecutionSlotAllocatorTest.Physic
 		this.requests = new HashMap<>();
 		this.responses = new HashMap<>();
 		this.cancellations = new HashMap<>();
-
-		this.availableSlotCount = availableSlotCount;
-		this.allocateLatch = new CountDownLatch(availableSlotCount);
 	}
 
 	@Override
 	public CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(PhysicalSlotRequest physicalSlotRequest) {
 		SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId();
 		requests.put(slotRequestId, physicalSlotRequest);
-		CompletableFuture<TestingPhysicalSlot> resultFuture = new CompletableFuture<>();
-		responses.put(slotRequestId, resultFuture);
-
-		switch (physicalSlotFutureCompletion) {
-			case SUCCESS:
-				if (availableSlotCount > 0) {
-					completePhysicalSlotFutureFor(slotRequestId, new AllocationID());
-				} else {
-					resultFuture.completeExceptionally(new NoResourceAvailableException());
-				}
-				break;
-			case FAILURE:
-				resultFuture.completeExceptionally(new FlinkException("Test failure."));
-				break;
+		CompletableFuture<TestingPhysicalSlot> resultFuture;
+		try {
+			resultFuture = physicalSlotCreator.apply(
+				taskManagerLocation,
+				taskManagerGateway,
+				physicalSlotRequest.getSlotProfile().getPhysicalSlotResourceProfile());
+		} catch (Throwable t) {
+			resultFuture = FutureUtils.completedExceptionally(t);

Review comment:
       Given that `physicalSlotCreator` can return a `CompletableFuture`, why do we need to catch exceptions here? Couldn't the creator return an exceptionally completed future? If the creator only returns a `TestingPhysicalSlot`, then this would be needed.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.TriFunctionWithException;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A {@link PhysicalSlotProvider} implementation that can be used in tests.
+ */
+public class TestingPhysicalSlotProvider implements PhysicalSlotProvider {
+
+	private final Map<SlotRequestId, PhysicalSlotRequest> requests;
+	private final Map<SlotRequestId, CompletableFuture<TestingPhysicalSlot>> responses;
+	private final Map<SlotRequestId, Throwable> cancellations;
+
+	private final TaskManagerLocation taskManagerLocation;
+	private final TaskManagerGateway taskManagerGateway;
+
+	private final TriFunctionWithException<TaskManagerLocation, TaskManagerGateway, ResourceProfile, CompletableFuture<TestingPhysicalSlot>, Throwable> physicalSlotCreator;
+
+	private TestingPhysicalSlotProvider(TriFunctionWithException<TaskManagerLocation, TaskManagerGateway, ResourceProfile, CompletableFuture<TestingPhysicalSlot>, Throwable> physicalSlotCreator, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway) {
+		this.physicalSlotCreator = physicalSlotCreator;
+
+		this.taskManagerLocation = taskManagerLocation;
+		this.taskManagerGateway = taskManagerGateway;
+
+		this.requests = new HashMap<>();
+		this.responses = new HashMap<>();
+		this.cancellations = new HashMap<>();
+	}
+
+	@Override
+	public CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(PhysicalSlotRequest physicalSlotRequest) {
+		SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId();
+		requests.put(slotRequestId, physicalSlotRequest);
+		CompletableFuture<TestingPhysicalSlot> resultFuture;
+		try {
+			resultFuture = physicalSlotCreator.apply(
+				taskManagerLocation,
+				taskManagerGateway,
+				physicalSlotRequest.getSlotProfile().getPhysicalSlotResourceProfile());

Review comment:
       I am still a bit sceptical that `taskManagerLocation` and `taskManagerGateway` belong to the `TestingPhysicalSlotProvider`. I think they rather belong to the `physicalSlotCreator`. An indicator is that we now have 2 ways of how to specify the `TaskManagerLocation`. 1) set a custom `physicalSlotCreator` which is given the location 2) set the location for the `TestingPhysicalSlotProvider`. I think this is an indicator that there is a bit of ambiguity in the separation of concerns.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749574484


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 1662fceecaa769723566c9aecb36827dd5104b23 (Tue Dec 22 14:44:03 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1662fceecaa769723566c9aecb36827dd5104b23 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187) 
   * 2147c4b63bfdf589e2c5ab167b0c41cacc7a514c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749574484


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 5d8b123b56dc859b61490e79bd12a3fcc5abe5fc (Fri May 28 06:58:55 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 23d374243c9726e72ec4b1e1e87739739cded4b9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342) 
   * bcf71e17c9821db76fa474ad0316abefc5b1cfec UNKNOWN
   * 5dfaefedb78d18a8cddda978cf121eacdb481789 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1662fceecaa769723566c9aecb36827dd5104b23 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187) 
   * 2147c4b63bfdf589e2c5ab167b0c41cacc7a514c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
XComp commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r549677771



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
##########
@@ -179,10 +176,10 @@ private SchedulerBase createScheduler(
 			final JobVertex... vertices) throws Exception {
 
 		final JobGraph jobGraph = new JobGraph(new JobID(), "test job", vertices);
-		final SlotProvider slotProvider = new TestingSlotProvider(
-			ignored -> CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot()));
-		final SchedulerBase scheduler = SchedulerTestingUtils
-			.newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, slotProvider, Time.seconds(10))
+		final SchedulerBase scheduler = SchedulerTestingUtils.newSchedulerBuilder(jobGraph)
+			.setExecutionSlotAllocatorFactory(SchedulerTestingUtils.slotSharingExecutionSlotAllocatorFactoryBuilder()

Review comment:
       Fair enough. I removed it. Initially, I just thought of keeping the test code as is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r549764668



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -76,8 +91,10 @@ void completePhysicalSlotFutureFor(
 		AllocationID allocationID) {
 		ResourceProfile resourceProfile = requests.get(slotRequestId).getSlotProfile().getPhysicalSlotResourceProfile();
 		SharedSlotTestingUtils.TestingPhysicalSlot physicalSlot = new SharedSlotTestingUtils.TestingPhysicalSlot(
-			resourceProfile,
-			allocationID);
+			allocationID,
+			taskManagerLocation,
+			taskManagerGateway,
+			resourceProfile);

Review comment:
       I'll check the new commits.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
XComp commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r549702320



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -76,8 +91,10 @@ void completePhysicalSlotFutureFor(
 		AllocationID allocationID) {
 		ResourceProfile resourceProfile = requests.get(slotRequestId).getSlotProfile().getPhysicalSlotResourceProfile();
 		SharedSlotTestingUtils.TestingPhysicalSlot physicalSlot = new SharedSlotTestingUtils.TestingPhysicalSlot(
-			resourceProfile,
-			allocationID);
+			allocationID,
+			taskManagerLocation,
+			taskManagerGateway,
+			resourceProfile);

Review comment:
       I don't see the necessity to add another Factory class after the refactoring. Do you agree? Or would you still prefer having a Factory class hiding away the `TaskManagerLocation` and `TaskManagerGateway`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
XComp commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r549674166



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
##########
@@ -136,6 +136,16 @@ public static DefaultSchedulerBuilder newSchedulerBuilderWithSlotSharingExecutio
 		return newSchedulerBuilderWithSlotSharingExecutionSlotAllocator(
 			jobGraph,
 			new TestingPhysicalSlotProvider(SlotSharingExecutionSlotAllocatorTest.PhysicalSlotFutureCompletionMode.SUCCESS),
+			allocationTimeout);
+	}
+
+	public static DefaultSchedulerBuilder newSchedulerBuilderWithSlotSharingExecutionSlotAllocator(

Review comment:
       Would you prefer having factory methods with different parameter combinations instead of using the Builder?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
XComp commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r549691715



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
##########
@@ -130,22 +132,23 @@ public void testConstraintsAfterRestart() throws Exception {
 			timeout);
 
 		//checking execution vertex properties
-		validateConstraints(eg);
+		validateConstraints(eg, slotSharingStrategyFactory.getMostRecentStrategyInstance());
 
 		ExecutionGraphTestUtils.finishAllVertices(eg);
 
 		assertThat(eg.getState(), is(FINISHED));
 	}
 
-	private void validateConstraints(ExecutionGraph eg) {
+	private void validateConstraints(ExecutionGraph eg, SlotSharingStrategy slotSharingStrategy) {
 
 		ExecutionJobVertex[] tasks = eg.getAllVertices().values().toArray(new ExecutionJobVertex[2]);
 
 		for (int i = 0; i < NUM_TASKS; i++) {
-			CoLocationConstraint constr1 = tasks[0].getTaskVertices()[i].getLocationConstraint();
-			CoLocationConstraint constr2 = tasks[1].getTaskVertices()[i].getLocationConstraint();
-			assertThat(constr1.isAssigned(), is(true));
-			assertThat(constr1.getLocation(), equalTo(constr2.getLocation()));
+			ExecutionVertexID executionVertexID0 = tasks[0].getTaskVertices()[i].getID();
+			ExecutionVertexID executionVertexID1 = tasks[1].getTaskVertices()[i].getID();
+
+			assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(executionVertexID0).getExecutionVertexIds(), hasItem(executionVertexID1));
+			assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(executionVertexID1).getExecutionVertexIds(), hasItem(executionVertexID0));

Review comment:
       Great. That fixes the visibility changes 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1662fceecaa769723566c9aecb36827dd5104b23 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378",
       "triggerID" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11384",
       "triggerID" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "triggerType" : "PUSH"
     }, {
       "hash" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11458",
       "triggerID" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f71cb168af01508ae680a5353e8c159015275f42",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11463",
       "triggerID" : "f71cb168af01508ae680a5353e8c159015275f42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "51dad931454e096b565e35daf9342209679271bf",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11470",
       "triggerID" : "51dad931454e096b565e35daf9342209679271bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5d8b123b56dc859b61490e79bd12a3fcc5abe5fc",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11471",
       "triggerID" : "5d8b123b56dc859b61490e79bd12a3fcc5abe5fc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bcf71e17c9821db76fa474ad0316abefc5b1cfec UNKNOWN
   * 51dad931454e096b565e35daf9342209679271bf Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11470) 
   * 5d8b123b56dc859b61490e79bd12a3fcc5abe5fc Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11471) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
XComp commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r550201079



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+/** A {@link PhysicalSlotProvider} implementation that can be used in tests. */
+public class TestingPhysicalSlotProvider implements PhysicalSlotProvider {
+
+    private final Map<SlotRequestId, PhysicalSlotRequest> requests;
+    private final Map<SlotRequestId, CompletableFuture<TestingPhysicalSlot>> responses;
+    private final Map<SlotRequestId, Throwable> cancellations;
+
+    private final Function<ResourceProfile, CompletableFuture<TestingPhysicalSlot>>
+            physicalSlotCreator;
+
+    public static TestingPhysicalSlotProvider create(
+            Function<ResourceProfile, CompletableFuture<TestingPhysicalSlot>> physicalSlotCreator) {
+        return new TestingPhysicalSlotProvider(physicalSlotCreator);
+    }
+
+    public static TestingPhysicalSlotProvider createWithInfiniteSlotCreation() {
+        return create(
+                (resourceProfile) ->
+                        CompletableFuture.completedFuture(
+                                new TestingPhysicalSlot(resourceProfile, new AllocationID())));
+    }
+
+    public static TestingPhysicalSlotProvider createWithoutImmediatePhysicalSlotCreation() {
+        return create((ignored) -> new CompletableFuture<>());
+    }
+
+    public static TestingPhysicalSlotProvider createWithFailingPhysicalSlotCreation(Throwable t) {
+        return create((ignored) -> FutureUtils.completedExceptionally(t));
+    }
+
+    public static TestingPhysicalSlotProvider createWithLimitedAmountOfPhysicalSlots(
+            int slotCount) {
+        return createWithLimitedAmountOfPhysicalSlots(
+                slotCount, new SimpleAckingTaskManagerGateway());
+    }
+
+    public static TestingPhysicalSlotProvider createWithLimitedAmountOfPhysicalSlots(
+            int slotCount, TaskManagerGateway taskManagerGateway) {
+        AtomicInteger availableSlotCount = new AtomicInteger(slotCount);
+        return create(
+                (resourceProfile) -> {
+                    int count = availableSlotCount.getAndDecrement();
+                    if (count > 0) {
+                        return CompletableFuture.completedFuture(
+                                TestingPhysicalSlot.builder()
+                                        .withResourceProfile(resourceProfile)
+                                        .withTaskManagerGateway(taskManagerGateway)
+                                        .build());
+                    } else {
+                        return FutureUtils.completedExceptionally(
+                                new NoResourceAvailableException(
+                                        String.format(
+                                                "The limit of %d provided slots was reached. No available slots can be provided.",
+                                                slotCount)));
+                    }
+                });
+    }
+
+    private TestingPhysicalSlotProvider(
+            Function<ResourceProfile, CompletableFuture<TestingPhysicalSlot>> physicalSlotCreator) {
+        this.physicalSlotCreator = physicalSlotCreator;
+
+        this.requests = new HashMap<>();
+        this.responses = new HashMap<>();
+        this.cancellations = new HashMap<>();
+    }
+
+    @Override
+    public CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(
+            PhysicalSlotRequest physicalSlotRequest) {
+        SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId();
+        requests.put(slotRequestId, physicalSlotRequest);
+        CompletableFuture<TestingPhysicalSlot> resultFuture;
+        try {
+            resultFuture =
+                    physicalSlotCreator.apply(
+                            physicalSlotRequest.getSlotProfile().getPhysicalSlotResourceProfile());
+        } catch (Throwable t) {
+            resultFuture = FutureUtils.completedExceptionally(t);
+        }

Review comment:
       Ah thanks for removing it. That one I missed accidentally... 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378",
       "triggerID" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11384",
       "triggerID" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "triggerType" : "PUSH"
     }, {
       "hash" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11458",
       "triggerID" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bcf71e17c9821db76fa474ad0316abefc5b1cfec UNKNOWN
   * 353160767d25e932e5f650a335574ca2eb9a88e1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11458) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp commented on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
XComp commented on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-751693135


   > I think the refactoring looks good to me. One thing I was wondering whether we can make the configuration of the `SlotSharingExecutionSlotAllocator` a bit easier or whether the nested builder is good enough for it.
   
   I counted 6 different parameter combinations which would need to be covered
   - `()` - default
   - `(PhysicalSlotProvider, allocationTimeout:Time)`
   - `(SlotSharingStrategyFactory)`
   - `(availableSlotCount:int)`
   - `(PhysicalSlotProvider)`
   - `(allocationTimeout:Time)`
   
   Hence, I considered the builder being easier to read.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378",
       "triggerID" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11384",
       "triggerID" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "triggerType" : "PUSH"
     }, {
       "hash" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11458",
       "triggerID" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f71cb168af01508ae680a5353e8c159015275f42",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11463",
       "triggerID" : "f71cb168af01508ae680a5353e8c159015275f42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "51dad931454e096b565e35daf9342209679271bf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11470",
       "triggerID" : "51dad931454e096b565e35daf9342209679271bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5d8b123b56dc859b61490e79bd12a3fcc5abe5fc",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11471",
       "triggerID" : "5d8b123b56dc859b61490e79bd12a3fcc5abe5fc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bcf71e17c9821db76fa474ad0316abefc5b1cfec UNKNOWN
   * 5d8b123b56dc859b61490e79bd12a3fcc5abe5fc Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11471) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2147c4b63bfdf589e2c5ab167b0c41cacc7a514c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260) 
   * 23d374243c9726e72ec4b1e1e87739739cded4b9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann closed pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
tillrohrmann closed pull request #14465:
URL: https://github.com/apache/flink/pull/14465


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
XComp edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-751693135


   > I think the refactoring looks good to me. One thing I was wondering whether we can make the configuration of the `SlotSharingExecutionSlotAllocator` a bit easier or whether the nested builder is good enough for it.
   
   I counted 6 different parameter combinations which would need to be covered
   - `()` (i.e. no parameters for customization)
   - `(PhysicalSlotProvider, allocationTimeout:Time)`
   - `(SlotSharingStrategyFactory)`
   - `(availableSlotCount:int)`
   - `(PhysicalSlotProvider)`
   - `(allocationTimeout:Time)`
   
   Hence, I considered the builder being easier to read.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378",
       "triggerID" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 23d374243c9726e72ec4b1e1e87739739cded4b9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342) 
   * bcf71e17c9821db76fa474ad0316abefc5b1cfec UNKNOWN
   * 5dfaefedb78d18a8cddda978cf121eacdb481789 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378) 
   * 6898634d33237f76e7197479432fd7840f3a8a83 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378",
       "triggerID" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11384",
       "triggerID" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "triggerType" : "PUSH"
     }, {
       "hash" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11458",
       "triggerID" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f71cb168af01508ae680a5353e8c159015275f42",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f71cb168af01508ae680a5353e8c159015275f42",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bcf71e17c9821db76fa474ad0316abefc5b1cfec UNKNOWN
   * 353160767d25e932e5f650a335574ca2eb9a88e1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11458) 
   * f71cb168af01508ae680a5353e8c159015275f42 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
XComp commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r549672139



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
##########
@@ -413,7 +411,10 @@ private static ResourceProfile fulfilOneOfTwoSlotRequestsAndGetPendingProfile(
 		return requests.get(slotRequestId2).getSlotProfile().getPhysicalSlotResourceProfile();
 	}
 
-	private enum PhysicalSlotFutureCompletionMode {
+	/**
+	 * Enum for covering different ways of a Future to complete.
+	 */
+	public enum PhysicalSlotFutureCompletionMode {
 		SUCCESS,
 		FAILURE,
 		MANUAL,

Review comment:
       I removed it entirely.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r550181254



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+/** A {@link PhysicalSlotProvider} implementation that can be used in tests. */
+public class TestingPhysicalSlotProvider implements PhysicalSlotProvider {
+
+    private final Map<SlotRequestId, PhysicalSlotRequest> requests;
+    private final Map<SlotRequestId, CompletableFuture<TestingPhysicalSlot>> responses;
+    private final Map<SlotRequestId, Throwable> cancellations;
+
+    private final Function<ResourceProfile, CompletableFuture<TestingPhysicalSlot>>
+            physicalSlotCreator;
+
+    public static TestingPhysicalSlotProvider create(
+            Function<ResourceProfile, CompletableFuture<TestingPhysicalSlot>> physicalSlotCreator) {
+        return new TestingPhysicalSlotProvider(physicalSlotCreator);
+    }
+
+    public static TestingPhysicalSlotProvider createWithInfiniteSlotCreation() {
+        return create(
+                (resourceProfile) ->
+                        CompletableFuture.completedFuture(
+                                new TestingPhysicalSlot(resourceProfile, new AllocationID())));
+    }
+
+    public static TestingPhysicalSlotProvider createWithoutImmediatePhysicalSlotCreation() {
+        return create((ignored) -> new CompletableFuture<>());
+    }
+
+    public static TestingPhysicalSlotProvider createWithFailingPhysicalSlotCreation(Throwable t) {
+        return create((ignored) -> FutureUtils.completedExceptionally(t));
+    }
+
+    public static TestingPhysicalSlotProvider createWithLimitedAmountOfPhysicalSlots(
+            int slotCount) {
+        return createWithLimitedAmountOfPhysicalSlots(
+                slotCount, new SimpleAckingTaskManagerGateway());
+    }
+
+    public static TestingPhysicalSlotProvider createWithLimitedAmountOfPhysicalSlots(
+            int slotCount, TaskManagerGateway taskManagerGateway) {
+        AtomicInteger availableSlotCount = new AtomicInteger(slotCount);
+        return create(
+                (resourceProfile) -> {
+                    int count = availableSlotCount.getAndDecrement();
+                    if (count > 0) {
+                        return CompletableFuture.completedFuture(
+                                TestingPhysicalSlot.builder()
+                                        .withResourceProfile(resourceProfile)
+                                        .withTaskManagerGateway(taskManagerGateway)
+                                        .build());
+                    } else {
+                        return FutureUtils.completedExceptionally(
+                                new NoResourceAvailableException(
+                                        String.format(
+                                                "The limit of %d provided slots was reached. No available slots can be provided.",
+                                                slotCount)));
+                    }
+                });
+    }
+
+    private TestingPhysicalSlotProvider(
+            Function<ResourceProfile, CompletableFuture<TestingPhysicalSlot>> physicalSlotCreator) {
+        this.physicalSlotCreator = physicalSlotCreator;
+
+        this.requests = new HashMap<>();
+        this.responses = new HashMap<>();
+        this.cancellations = new HashMap<>();
+    }
+
+    @Override
+    public CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(
+            PhysicalSlotRequest physicalSlotRequest) {
+        SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId();
+        requests.put(slotRequestId, physicalSlotRequest);
+        CompletableFuture<TestingPhysicalSlot> resultFuture;
+        try {
+            resultFuture =
+                    physicalSlotCreator.apply(
+                            physicalSlotRequest.getSlotProfile().getPhysicalSlotResourceProfile());
+        } catch (Throwable t) {
+            resultFuture = FutureUtils.completedExceptionally(t);
+        }

Review comment:
       Are we still needing this catch block?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2147c4b63bfdf589e2c5ab167b0c41cacc7a514c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260) 
   * 23d374243c9726e72ec4b1e1e87739739cded4b9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
XComp edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-751693135


   > I think the refactoring looks good to me. One thing I was wondering whether we can make the configuration of the `SlotSharingExecutionSlotAllocator` a bit easier or whether the nested builder is good enough for it.
   
   I counted 6 different parameter combinations which would need to be covered
   - `()` (i.e. no parameters for customization)
   - `(physicalSlotProvider, allocationTimeout)`
   - `(slotSharingStrategyFactory)`
   - `(availableSlotCount:int)`
   - `(physicalSlotProvider)`
   - `(allocationTimeout)`
   
   Hence, I considered the builder being easier to read.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
XComp commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r549674166



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
##########
@@ -136,6 +136,16 @@ public static DefaultSchedulerBuilder newSchedulerBuilderWithSlotSharingExecutio
 		return newSchedulerBuilderWithSlotSharingExecutionSlotAllocator(
 			jobGraph,
 			new TestingPhysicalSlotProvider(SlotSharingExecutionSlotAllocatorTest.PhysicalSlotFutureCompletionMode.SUCCESS),
+			allocationTimeout);
+	}
+
+	public static DefaultSchedulerBuilder newSchedulerBuilderWithSlotSharingExecutionSlotAllocator(

Review comment:
       ~Would you prefer having factory methods with different parameter combinations instead of using the Builder?~
   I realized that we talked about different methods here. I switched back to factory methods instead of builders in the most recent refactoring.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
XComp commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r549694571



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -196,9 +201,15 @@ public Builder withAvailableSlotCount(int availableSlotCount) {
 			return this;
 		}
 
+		public Builder withPhysicalSlotCreator(QuinConsumer<CompletableFuture<TestingPhysicalSlot>, AllocationID, TaskManagerLocation, TaskManagerGateway, ResourceProfile> physicalSlotCreator) {

Review comment:
       I switched to `TriFunctionWithException` now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378",
       "triggerID" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11384",
       "triggerID" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "triggerType" : "PUSH"
     }, {
       "hash" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11458",
       "triggerID" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f71cb168af01508ae680a5353e8c159015275f42",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11463",
       "triggerID" : "f71cb168af01508ae680a5353e8c159015275f42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "51dad931454e096b565e35daf9342209679271bf",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11470",
       "triggerID" : "51dad931454e096b565e35daf9342209679271bf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bcf71e17c9821db76fa474ad0316abefc5b1cfec UNKNOWN
   * 353160767d25e932e5f650a335574ca2eb9a88e1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11458) 
   * f71cb168af01508ae680a5353e8c159015275f42 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11463) 
   * 51dad931454e096b565e35daf9342209679271bf Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11470) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
XComp commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r549678069



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -85,13 +95,16 @@ private TestingPhysicalSlotProvider(SlotSharingExecutionSlotAllocatorTest.Physic
 	void completePhysicalSlotFutureFor(
 		SlotRequestId slotRequestId,
 		AllocationID allocationID) {
-		ResourceProfile resourceProfile = requests.get(slotRequestId).getSlotProfile().getPhysicalSlotResourceProfile();
-		SharedSlotTestingUtils.TestingPhysicalSlot physicalSlot = new SharedSlotTestingUtils.TestingPhysicalSlot(
-			allocationID,
-			taskManagerLocation,
-			taskManagerGateway,
-			resourceProfile);
-		responses.get(slotRequestId).complete(physicalSlot);
+		synchronized (lock) {

Review comment:
       I removed the concurrency as part of a larger refactoring of the `TestingPhysicalSlotProvider`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378",
       "triggerID" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11384",
       "triggerID" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "triggerType" : "PUSH"
     }, {
       "hash" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11458",
       "triggerID" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f71cb168af01508ae680a5353e8c159015275f42",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11463",
       "triggerID" : "f71cb168af01508ae680a5353e8c159015275f42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "51dad931454e096b565e35daf9342209679271bf",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11470",
       "triggerID" : "51dad931454e096b565e35daf9342209679271bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5d8b123b56dc859b61490e79bd12a3fcc5abe5fc",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5d8b123b56dc859b61490e79bd12a3fcc5abe5fc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bcf71e17c9821db76fa474ad0316abefc5b1cfec UNKNOWN
   * 353160767d25e932e5f650a335574ca2eb9a88e1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11458) 
   * f71cb168af01508ae680a5353e8c159015275f42 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11463) 
   * 51dad931454e096b565e35daf9342209679271bf Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11470) 
   * 5d8b123b56dc859b61490e79bd12a3fcc5abe5fc UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378",
       "triggerID" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11384",
       "triggerID" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "triggerType" : "PUSH"
     }, {
       "hash" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11458",
       "triggerID" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f71cb168af01508ae680a5353e8c159015275f42",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11463",
       "triggerID" : "f71cb168af01508ae680a5353e8c159015275f42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "51dad931454e096b565e35daf9342209679271bf",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11470",
       "triggerID" : "51dad931454e096b565e35daf9342209679271bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5d8b123b56dc859b61490e79bd12a3fcc5abe5fc",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11471",
       "triggerID" : "5d8b123b56dc859b61490e79bd12a3fcc5abe5fc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bcf71e17c9821db76fa474ad0316abefc5b1cfec UNKNOWN
   * f71cb168af01508ae680a5353e8c159015275f42 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11463) 
   * 51dad931454e096b565e35daf9342209679271bf Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11470) 
   * 5d8b123b56dc859b61490e79bd12a3fcc5abe5fc Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11471) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1662fceecaa769723566c9aecb36827dd5104b23 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1662fceecaa769723566c9aecb36827dd5104b23 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r549400907



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
##########
@@ -413,7 +411,10 @@ private static ResourceProfile fulfilOneOfTwoSlotRequestsAndGetPendingProfile(
 		return requests.get(slotRequestId2).getSlotProfile().getPhysicalSlotResourceProfile();
 	}
 
-	private enum PhysicalSlotFutureCompletionMode {
+	/**
+	 * Enum for covering different ways of a Future to complete.
+	 */
+	public enum PhysicalSlotFutureCompletionMode {
 		SUCCESS,
 		FAILURE,
 		MANUAL,

Review comment:
       Hmm, shouldn't this enum be part of the `TestingPhysicalSlotProvider.java` file?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
##########
@@ -215,22 +182,12 @@ public void testCanceledExecutionReturnsSlot() throws Exception {
 			}
 		);
 
-		slotRequestIdFuture.thenAcceptAsync(
-			(SlotRequestId slotRequestId) -> {
-				final SingleLogicalSlot singleLogicalSlot = createSingleLogicalSlot(
-					slotOwner,
-					taskManagerGateway,
-					slotRequestId);
-				slotProvider.complete(slotRequestId, singleLogicalSlot);
-			},
-			testMainThreadUtil.getMainThreadExecutor());
-
 		testMainThreadUtil.execute(scheduler::startScheduling);
 
 		// cancel the execution in case we could schedule the execution
 		testMainThreadUtil.execute(execution::cancel);
 
-		assertThat(returnedSlotFuture.get(), is(equalTo(slotRequestIdFuture.get())));
+		assertThat(physicalSlotProvider.getRequests().keySet(), is(physicalSlotProvider.getCancellations().keySet()));

Review comment:
       Much nicer :-)

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
##########
@@ -136,6 +136,16 @@ public static DefaultSchedulerBuilder newSchedulerBuilderWithSlotSharingExecutio
 		return newSchedulerBuilderWithSlotSharingExecutionSlotAllocator(
 			jobGraph,
 			new TestingPhysicalSlotProvider(SlotSharingExecutionSlotAllocatorTest.PhysicalSlotFutureCompletionMode.SUCCESS),
+			allocationTimeout);
+	}
+
+	public static DefaultSchedulerBuilder newSchedulerBuilderWithSlotSharingExecutionSlotAllocator(

Review comment:
       This method could be used in `ExecutionPartitionLifecycleTest.java` instead.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
##########
@@ -401,6 +401,58 @@ public static ExecutionAttemptID getAttemptId(DefaultScheduler scheduler, JobVer
 		}
 	}
 
+	public static SlotSharingExecutionSlotAllocatorFactoryBuilder slotSharingExecutionSlotAllocatorFactoryBuilder() {

Review comment:
       Maybe name `newSlotSharingExecutionSlotAllocatorFactoryBuilder`

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -76,8 +91,10 @@ void completePhysicalSlotFutureFor(
 		AllocationID allocationID) {
 		ResourceProfile resourceProfile = requests.get(slotRequestId).getSlotProfile().getPhysicalSlotResourceProfile();
 		SharedSlotTestingUtils.TestingPhysicalSlot physicalSlot = new SharedSlotTestingUtils.TestingPhysicalSlot(
-			resourceProfile,
-			allocationID);
+			allocationID,
+			taskManagerLocation,
+			taskManagerGateway,
+			resourceProfile);

Review comment:
       Nit: Maybe it would be simpler to provide a `TestingPhysicalSlotFactory`. Then, the `TestingPhysicalSlotProvider` wouldn't have to know about the `taskManagerLocation` and the `taskManagerGateway`.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
##########
@@ -160,14 +157,6 @@ private void finishExecution(
 		).join();
 	}
 
-	@Nonnull
-	private PhysicalSlotProvider createSlotProvider(SlotPool slotPool, ComponentMainThreadExecutor mainThreadExecutor) {
-		PhysicalSlotProviderImpl slotProvider = new PhysicalSlotProviderImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
-		// scheduler.start(mainThreadExecutor);

Review comment:
       Yes it should be fine to remove the start call as it only sets the main thread executor of the old `Scheduler` implementations.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
##########
@@ -161,11 +161,11 @@ private void finishExecution(
 	}
 
 	@Nonnull
-	private SlotProvider createSlotProvider(SlotPool slotPool, ComponentMainThreadExecutor mainThreadExecutor) {
-		final SchedulerImpl scheduler = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
-		scheduler.start(mainThreadExecutor);
+	private PhysicalSlotProvider createSlotProvider(SlotPool slotPool, ComponentMainThreadExecutor mainThreadExecutor) {
+		PhysicalSlotProviderImpl slotProvider = new PhysicalSlotProviderImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
+		// scheduler.start(mainThreadExecutor);

Review comment:
       Can this be removed?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
##########
@@ -118,7 +118,7 @@ public void testSchedulingOfJobWithFewerSlotsThanParallelism() throws Exception
 				Collections.singletonList(ResourceProfile.ANY),
 				new RpcTaskManagerGateway(testingTaskExecutorGateway, JobMasterId.generate()));
 
-			final SlotProvider slotProvider = createSlotProvider(slotPool, mainThreadExecutor);
+			final PhysicalSlotProvider slotProvider = createSlotProvider(slotPool, mainThreadExecutor);

Review comment:
       Maybe rename `createSlotProvider` into `createPhysicalSlotProvider`.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -85,13 +95,16 @@ private TestingPhysicalSlotProvider(SlotSharingExecutionSlotAllocatorTest.Physic
 	void completePhysicalSlotFutureFor(
 		SlotRequestId slotRequestId,
 		AllocationID allocationID) {
-		ResourceProfile resourceProfile = requests.get(slotRequestId).getSlotProfile().getPhysicalSlotResourceProfile();
-		SharedSlotTestingUtils.TestingPhysicalSlot physicalSlot = new SharedSlotTestingUtils.TestingPhysicalSlot(
-			allocationID,
-			taskManagerLocation,
-			taskManagerGateway,
-			resourceProfile);
-		responses.get(slotRequestId).complete(physicalSlot);
+		synchronized (lock) {

Review comment:
       If concurrency is a problem, shouldn't we also synchronize `allocatePhysicalSlot`?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -132,4 +132,47 @@ PhysicalSlotRequest getFirstRequestOrFail() {
 		Preconditions.checkState(element.isPresent());
 		return element.get();
 	}
+
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	/**
+	 * {@code Builder} for creating {@code TestingPhysicalSlotProvider} instances.
+	 */
+	public static class Builder {

Review comment:
       Nice :-)

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -125,19 +129,23 @@ PhysicalSlotRequest getFirstRequestOrFail() {
 		return getFirstElementOrFail(requests.values());
 	}
 
-	Map<SlotRequestId, PhysicalSlotRequest> getRequests() {
+	public CountDownLatch getAllocateLatch() {
+		return allocateLatch;
+	}

Review comment:
       If it is not important that we are using a latch, we could hide this detail by renaming this method into `waitUntilAllSlotsAreSuccessfullyAllocated` and then calling `allocateLatch.await()` inside of this method.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -104,6 +107,7 @@ void completePhysicalSlotFutureFor(
 				resourceProfile);
 			responses.get(slotRequestId).complete(physicalSlot);
 			availableSlotCount--;
+			allocateLatch.countDown();

Review comment:
       What is the exact semantic of the `allocatedLatch`? Triggered if all slot requests have been successfully fulfilled?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
##########
@@ -578,21 +576,23 @@ public void testExecutionGraphIsDeployedInTopologicalOrder() throws Exception {
 			return CompletableFuture.completedFuture(Acknowledge.get());
 		});
 
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
 		final TestingTaskExecutorGateway taskExecutorGateway = testingTaskExecutorGatewayBuilder.createTestingTaskExecutorGateway();
 		final RpcTaskManagerGateway taskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway, JobMasterId.generate());
 
-		final Collection<CompletableFuture<LogicalSlot>> slotFutures = new ArrayList<>(numberTasks);
-		for (int i = 0; i < numberTasks; i++) {
-			slotFutures.add(new CompletableFuture<>());
-		}
-
-		final SlotProvider slotProvider = new IteratorTestingSlotProvider(slotFutures.iterator());
+		final Collection<CompletableFuture<TestingPhysicalSlot>> slotFutures = new ArrayList<>(numberTasks);
 
 		final JobGraph jobGraph = new JobGraph(jobId, "Test Job", sourceVertex, sinkVertex);
 		jobGraph.setScheduleMode(ScheduleMode.EAGER);
 
-		final SchedulerBase scheduler = SchedulerTestingUtils
-			.newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, slotProvider)
+		final SchedulerBase scheduler = SchedulerTestingUtils.newSchedulerBuilder(jobGraph)
+			.setExecutionSlotAllocatorFactory(SchedulerTestingUtils.slotSharingExecutionSlotAllocatorFactoryBuilder()
+				.withSlotProvider(TestingPhysicalSlotProvider.builder()
+					.withPhysicalSlotCreator((f, a, tml, tmg, r) -> slotFutures.add(f))

Review comment:
       Ah ok, now I see why you have introduced the consumer. I am not 100% sold that this is the best way to achieve it. Maybe we can collect all the `SlotRequestIds` from the `TestingPhysicalSlotProvider` and then call `TestingPhyiscalSlotProvider.completePhysicalSlotFutureFor` in a random order?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -56,8 +57,11 @@
 	private final CountDownLatch allocateLatch;
 	private volatile int availableSlotCount;
 
-	private TestingPhysicalSlotProvider(SlotSharingExecutionSlotAllocatorTest.PhysicalSlotFutureCompletionMode physicalSlotFutureCompletion, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, int availableSlotCount) {
+	private final QuinConsumer<CompletableFuture<TestingPhysicalSlot>, AllocationID, TaskManagerLocation, TaskManagerGateway, ResourceProfile> physicalSlotCreator;

Review comment:
       Instead of completing the future inside of the consumer, we could also introduce a `TestingPhysicalSlot` supplier which creates the `TestingPhyiscalSlot` and let the `TestingPhyiscalSlotProvider` decide where to register it.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -196,9 +201,15 @@ public Builder withAvailableSlotCount(int availableSlotCount) {
 			return this;
 		}
 
+		public Builder withPhysicalSlotCreator(QuinConsumer<CompletableFuture<TestingPhysicalSlot>, AllocationID, TaskManagerLocation, TaskManagerGateway, ResourceProfile> physicalSlotCreator) {

Review comment:
       I would suggest to not use side effects to transfer the created `TestingPhysicalSlot` out of the `physicalSlotCreator` and instead have something like `BiFunction<AllocationID, ResourceProfile, TestingPhysicalSlot>`, potentially even a `BiFunctionWithException` if we also want to let the creation method to fail.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
##########
@@ -179,10 +176,10 @@ private SchedulerBase createScheduler(
 			final JobVertex... vertices) throws Exception {
 
 		final JobGraph jobGraph = new JobGraph(new JobID(), "test job", vertices);
-		final SlotProvider slotProvider = new TestingSlotProvider(
-			ignored -> CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot()));
-		final SchedulerBase scheduler = SchedulerTestingUtils
-			.newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, slotProvider, Time.seconds(10))
+		final SchedulerBase scheduler = SchedulerTestingUtils.newSchedulerBuilder(jobGraph)
+			.setExecutionSlotAllocatorFactory(SchedulerTestingUtils.slotSharingExecutionSlotAllocatorFactoryBuilder()

Review comment:
       Why is the allocation timeout of 10s important? The test also passes with `withAllocationTimeout` removed.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingLocalInputPreferredSlotSharingStrategyFactory.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import java.util.Set;
+
+/**
+ * {@link LocalInputPreferredSlotSharingStrategy.Factory} extension that returns the most recently
+ * created {@link LocalInputPreferredSlotSharingStrategy} instance for testing purposes.
+ */
+public class TestingLocalInputPreferredSlotSharingStrategyFactory extends LocalInputPreferredSlotSharingStrategy.Factory {

Review comment:
       I think this is not needed if we test the `TaskManagerLocations` instead of the `ExecutionSlotSharingGroupIDs`.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
##########
@@ -130,22 +132,23 @@ public void testConstraintsAfterRestart() throws Exception {
 			timeout);
 
 		//checking execution vertex properties
-		validateConstraints(eg);
+		validateConstraints(eg, slotSharingStrategyFactory.getMostRecentStrategyInstance());
 
 		ExecutionGraphTestUtils.finishAllVertices(eg);
 
 		assertThat(eg.getState(), is(FINISHED));
 	}
 
-	private void validateConstraints(ExecutionGraph eg) {
+	private void validateConstraints(ExecutionGraph eg, SlotSharingStrategy slotSharingStrategy) {
 
 		ExecutionJobVertex[] tasks = eg.getAllVertices().values().toArray(new ExecutionJobVertex[2]);
 
 		for (int i = 0; i < NUM_TASKS; i++) {
-			CoLocationConstraint constr1 = tasks[0].getTaskVertices()[i].getLocationConstraint();
-			CoLocationConstraint constr2 = tasks[1].getTaskVertices()[i].getLocationConstraint();
-			assertThat(constr1.isAssigned(), is(true));
-			assertThat(constr1.getLocation(), equalTo(constr2.getLocation()));
+			ExecutionVertexID executionVertexID0 = tasks[0].getTaskVertices()[i].getID();
+			ExecutionVertexID executionVertexID1 = tasks[1].getTaskVertices()[i].getID();
+
+			assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(executionVertexID0).getExecutionVertexIds(), hasItem(executionVertexID1));
+			assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(executionVertexID1).getExecutionVertexIds(), hasItem(executionVertexID0));

Review comment:
       Instead of relying on internal details, I think it would be good enough to assert the following:
   
   ```
   final TaskManagerLocation location0 = tasks[0].getTaskVertices()[i]
   				.getCurrentAssignedResourceLocation();
   			final TaskManagerLocation location1 = tasks[1].getTaskVertices()[i]
   				.getCurrentAssignedResourceLocation();
   
   			assertThat(location0, is(equalTo(location1)));
   ```
   
   Moreover, we should set the test up in a way that every `PhysicalSlot` has a different `TaskManagerLocation`. That way we ensure that the colocation constraints are honored.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14465: [FLINK-20594][runtime] Remove DefaultExecutionSlotAllocator and its Factory class.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14465:
URL: https://github.com/apache/flink/pull/14465#issuecomment-749579707


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11187",
       "triggerID" : "1662fceecaa769723566c9aecb36827dd5104b23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11260",
       "triggerID" : "2147c4b63bfdf589e2c5ab167b0c41cacc7a514c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11342",
       "triggerID" : "23d374243c9726e72ec4b1e1e87739739cded4b9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bcf71e17c9821db76fa474ad0316abefc5b1cfec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11378",
       "triggerID" : "5dfaefedb78d18a8cddda978cf121eacdb481789",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11384",
       "triggerID" : "6898634d33237f76e7197479432fd7840f3a8a83",
       "triggerType" : "PUSH"
     }, {
       "hash" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "353160767d25e932e5f650a335574ca2eb9a88e1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bcf71e17c9821db76fa474ad0316abefc5b1cfec UNKNOWN
   * 6898634d33237f76e7197479432fd7840f3a8a83 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11384) 
   * 353160767d25e932e5f650a335574ca2eb9a88e1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org