You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/30 08:31:26 UTC

[GitHub] [flink] azagrebin opened a new pull request #13028: [FLINK-18739] Implement MergingSharedSlotProfileRetriever

azagrebin opened a new pull request #13028:
URL: https://github.com/apache/flink/pull/13028


   ## What is the purpose of the change
   
   Input location preferences will be considered for each `SharedSlot` when allocating a physical slot for it. Input location preferences of a `SharedSlot` are the merged input location preferences of all the tasks to run in this `SharedSlot`.
   
   Inter-`ExecutionSlotSharingGroup` input location preferences can be respected in this way for `ExecutionSlotSharingGroup`s belonging to different bulks. If `ExecutionSlotSharingGroup`s belong to the same bulk, the input location preferences are ignored because of possible cyclic dependencies. Later, we can optimise this case when the declarative resource management for reactive mode is ready.
   
   Intra-`ExecutionSlotSharingGroup` input location preferences will also be respected when creating `ExecutionSlotSharingGroup`(s) in `LocalInputPreferredSlotSharingStrategy `.
   
   The PR is based on #13018 and #13009.
   
   ## Brief change log
   
     - Introduce interfaces for `SharedSlotProfileRetriever` and `SharedSlotProfileRetrieverFactory`
     - Implement them with `MergingSharedSlotProfileRetriever` and `MergingSharedSlotProfileRetrieverFactory`
     - add unit tests
   
   ## Verifying this change
   
   Unit tests.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13028: [FLINK-18739] Implement MergingSharedSlotProfileRetriever

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13028:
URL: https://github.com/apache/flink/pull/13028#issuecomment-666235055


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "eb0a8e585b2aba939c4a43ba409566f3df287848",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5045",
       "triggerID" : "eb0a8e585b2aba939c4a43ba409566f3df287848",
       "triggerType" : "PUSH"
     }, {
       "hash" : "926fb3518b1b611b53901b4b9fcc834a0c0f9da1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5155",
       "triggerID" : "926fb3518b1b611b53901b4b9fcc834a0c0f9da1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 926fb3518b1b611b53901b4b9fcc834a0c0f9da1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5155) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13028: [FLINK-18739] Implement MergingSharedSlotProfileRetriever

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13028:
URL: https://github.com/apache/flink/pull/13028#issuecomment-666227849


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit eb0a8e585b2aba939c4a43ba409566f3df287848 (Thu Jul 30 08:34:20 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #13028: [FLINK-18739] Implement MergingSharedSlotProfileRetriever

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #13028:
URL: https://github.com/apache/flink/pull/13028#discussion_r465441978



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.junit.Test;

Review comment:
       The checkstyle is violated here. "ImportOrder: 'org.junit.Test' should be separated from previous imports."




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13028: [FLINK-18739] Implement MergingSharedSlotProfileRetriever

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13028:
URL: https://github.com/apache/flink/pull/13028#issuecomment-666235055


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "eb0a8e585b2aba939c4a43ba409566f3df287848",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5045",
       "triggerID" : "eb0a8e585b2aba939c4a43ba409566f3df287848",
       "triggerType" : "PUSH"
     }, {
       "hash" : "926fb3518b1b611b53901b4b9fcc834a0c0f9da1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5155",
       "triggerID" : "926fb3518b1b611b53901b4b9fcc834a0c0f9da1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9bcb9cf1ff714e2c984162edc84b442c961baf13",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5191",
       "triggerID" : "9bcb9cf1ff714e2c984162edc84b442c961baf13",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9bcb9cf1ff714e2c984162edc84b442c961baf13 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5191) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #13028: [FLINK-18739] Implement MergingSharedSlotProfileRetriever

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #13028:
URL: https://github.com/apache/flink/pull/13028#discussion_r464215530



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Factory for {@link MergingSharedSlotProfileRetriever}.
+ */
+class MergingSharedSlotProfileRetrieverFactory implements SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory {
+	private final PreferredLocationsRetriever preferredLocationsRetriever;
+
+	private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever;
+
+	private final Function<ExecutionVertexID, AllocationID> prioAllocationIdRetriever;
+
+	MergingSharedSlotProfileRetrieverFactory(
+			PreferredLocationsRetriever preferredLocationsRetriever,
+			Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever,
+			Function<ExecutionVertexID, AllocationID> prioAllocationIdRetriever) {

Review comment:
       `prioAllocationIdRetriever` -> `priorAllocationIdRetriever `

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Factory for {@link MergingSharedSlotProfileRetriever}.
+ */
+class MergingSharedSlotProfileRetrieverFactory implements SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory {
+	private final PreferredLocationsRetriever preferredLocationsRetriever;
+
+	private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever;
+
+	private final Function<ExecutionVertexID, AllocationID> prioAllocationIdRetriever;
+
+	MergingSharedSlotProfileRetrieverFactory(
+			PreferredLocationsRetriever preferredLocationsRetriever,
+			Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever,
+			Function<ExecutionVertexID, AllocationID> prioAllocationIdRetriever) {

Review comment:
       Also applies to the this factory field with the same name.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Factory for {@link MergingSharedSlotProfileRetriever}.
+ */
+class MergingSharedSlotProfileRetrieverFactory implements SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory {
+	private final PreferredLocationsRetriever preferredLocationsRetriever;
+
+	private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever;
+
+	private final Function<ExecutionVertexID, AllocationID> prioAllocationIdRetriever;
+
+	MergingSharedSlotProfileRetrieverFactory(
+			PreferredLocationsRetriever preferredLocationsRetriever,
+			Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever,
+			Function<ExecutionVertexID, AllocationID> prioAllocationIdRetriever) {
+		this.preferredLocationsRetriever = Preconditions.checkNotNull(preferredLocationsRetriever);
+		this.resourceProfileRetriever = Preconditions.checkNotNull(resourceProfileRetriever);
+		this.prioAllocationIdRetriever = Preconditions.checkNotNull(prioAllocationIdRetriever);
+	}
+
+	@Override
+	public SharedSlotProfileRetriever createFromBulk(Set<ExecutionVertexID> bulk) {
+		Set<AllocationID> allPriorAllocationIds = bulk
+			.stream()
+			.map(prioAllocationIdRetriever)
+			.filter(Objects::nonNull)
+			.collect(Collectors.toSet());
+		return new MergingSharedSlotProfileRetriever(allPriorAllocationIds, bulk);
+	}
+
+	/**
+	 * Computes a merged {@link SlotProfile} of an execution slot sharing group within a bulk to schedule.
+	 */
+	private class MergingSharedSlotProfileRetriever implements SharedSlotProfileRetriever {
+		/**
+		 * All previous {@link AllocationID}s of the bulk to schedule.
+		 */
+		private final Set<AllocationID> allBulkPriorAllocationIds;
+
+		/**
+		 * All {@link ExecutionVertexID}s of the bulk.
+		 */
+		private final Set<ExecutionVertexID> producersToIgnore;
+
+		private MergingSharedSlotProfileRetriever(
+			Set<AllocationID> allBulkPriorAllocationIds,
+			Set<ExecutionVertexID> producersToIgnore) {

Review comment:
       maybe one more indentation or an empty line to separate the method header and body?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link org.apache.flink.runtime.scheduler.MergingSharedSlotProfileRetrieverFactory}.
+ */
+public class MergingSharedSlotProfileRetrieverTest {
+
+	private static final PreferredLocationsRetriever EMPTY_PREFERRED_LOCATIONS_RETRIEVER =
+		(executionVertexId, producersToIgnore) -> CompletableFuture.completedFuture(Collections.emptyList());
+
+	@Test
+	public void testGetEmptySlotProfile() throws ExecutionException, InterruptedException {
+		SharedSlotProfileRetriever sharedSlotProfileRetriever = new MergingSharedSlotProfileRetrieverFactory(
+			EMPTY_PREFERRED_LOCATIONS_RETRIEVER,
+			executionVertexID -> ResourceProfile.ZERO,
+			executionVertexID -> new AllocationID()
+		).createFromBulk(Collections.emptySet());
+
+		SlotProfile slotProfile = sharedSlotProfileRetriever.getSlotProfileFuture(new ExecutionSlotSharingGroup()).get();
+
+		assertThat(slotProfile.getTaskResourceProfile(), is(ResourceProfile.ZERO));
+		assertThat(slotProfile.getPhysicalSlotResourceProfile(), is(ResourceProfile.ZERO));
+		assertThat(slotProfile.getPreferredLocations(), containsInAnyOrder(Collections.emptyList()));

Review comment:
       ```suggestion
   		assertThat(slotProfile.getPreferredLocations(), hasSize(0));
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Factory for {@link MergingSharedSlotProfileRetriever}.
+ */
+class MergingSharedSlotProfileRetrieverFactory implements SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory {
+	private final PreferredLocationsRetriever preferredLocationsRetriever;
+
+	private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever;
+
+	private final Function<ExecutionVertexID, AllocationID> prioAllocationIdRetriever;
+
+	MergingSharedSlotProfileRetrieverFactory(
+			PreferredLocationsRetriever preferredLocationsRetriever,
+			Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever,
+			Function<ExecutionVertexID, AllocationID> prioAllocationIdRetriever) {
+		this.preferredLocationsRetriever = Preconditions.checkNotNull(preferredLocationsRetriever);
+		this.resourceProfileRetriever = Preconditions.checkNotNull(resourceProfileRetriever);
+		this.prioAllocationIdRetriever = Preconditions.checkNotNull(prioAllocationIdRetriever);
+	}
+
+	@Override
+	public SharedSlotProfileRetriever createFromBulk(Set<ExecutionVertexID> bulk) {
+		Set<AllocationID> allPriorAllocationIds = bulk
+			.stream()
+			.map(prioAllocationIdRetriever)
+			.filter(Objects::nonNull)
+			.collect(Collectors.toSet());
+		return new MergingSharedSlotProfileRetriever(allPriorAllocationIds, bulk);
+	}
+
+	/**
+	 * Computes a merged {@link SlotProfile} of an execution slot sharing group within a bulk to schedule.
+	 */
+	private class MergingSharedSlotProfileRetriever implements SharedSlotProfileRetriever {
+		/**
+		 * All previous {@link AllocationID}s of the bulk to schedule.
+		 */
+		private final Set<AllocationID> allBulkPriorAllocationIds;
+
+		/**
+		 * All {@link ExecutionVertexID}s of the bulk.
+		 */
+		private final Set<ExecutionVertexID> producersToIgnore;
+
+		private MergingSharedSlotProfileRetriever(
+			Set<AllocationID> allBulkPriorAllocationIds,
+			Set<ExecutionVertexID> producersToIgnore) {
+			this.allBulkPriorAllocationIds = Preconditions.checkNotNull(allBulkPriorAllocationIds);
+			this.producersToIgnore = Preconditions.checkNotNull(producersToIgnore);
+		}
+
+		/**
+		 * Computes a {@link SlotProfile} of an execution slot sharing group.
+		 *
+		 * <p>The {@link ResourceProfile} of the {@link SlotProfile} is the merged {@link ResourceProfile}s
+		 * of all executions sharing the slot.
+		 *
+		 * <p>The preferred locations of the {@link SlotProfile} is a union of the preferred locations
+		 * of all executions sharing the slot. The input locations within the bulk are ignored to avoid cyclic dependencies
+		 * within the region, e.g. in case of all-to-all pipelined connections, so that the allocations do not block each other.
+		 *
+		 * <p>The preferred {@link AllocationID}s of the {@link SlotProfile} are all previous {@link AllocationID}s
+		 * of all executions sharing the slot.
+		 *
+		 * <p>The {@link SlotProfile} also refers to all previous {@link AllocationID}s
+		 * of all executions within the bulk.
+		 *
+		 * @param executionSlotSharingGroup executions sharing the slot.
+		 * @return a future of the {@link SlotProfile} to allocate for the {@code executionSlotSharingGroup}.
+		 */
+		@Override
+		public CompletableFuture<SlotProfile> getSlotProfileFuture(ExecutionSlotSharingGroup executionSlotSharingGroup) {
+			ResourceProfile totalSlotResourceProfile = ResourceProfile.ZERO;
+			Collection<AllocationID> priorAllocations = new HashSet<>();
+			Collection<CompletableFuture<Collection<TaskManagerLocation>>> preferredLocationsPerExecution = new ArrayList<>();
+			for (ExecutionVertexID execution : executionSlotSharingGroup.getExecutionVertexIds()) {
+				totalSlotResourceProfile = totalSlotResourceProfile.merge(resourceProfileRetriever.apply(execution));
+				priorAllocations.add(prioAllocationIdRetriever.apply(execution));
+				preferredLocationsPerExecution.add(preferredLocationsRetriever
+					.getPreferredLocations(execution, producersToIgnore));
+			}
+
+			CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = FutureUtils
+				.combineAll(preferredLocationsPerExecution)
+				.thenApply(executionPreferredLocations ->
+					executionPreferredLocations.stream().flatMap(Collection::stream).collect(Collectors.toSet()));

Review comment:
       I think a list is better than a set here because the frequency of a location will be accounted when selecting the best location in `LocationPreferenceSlotSelectionStrategy`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13028: [FLINK-18739] Implement MergingSharedSlotProfileRetriever

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13028:
URL: https://github.com/apache/flink/pull/13028#issuecomment-666235055


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "eb0a8e585b2aba939c4a43ba409566f3df287848",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5045",
       "triggerID" : "eb0a8e585b2aba939c4a43ba409566f3df287848",
       "triggerType" : "PUSH"
     }, {
       "hash" : "926fb3518b1b611b53901b4b9fcc834a0c0f9da1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5155",
       "triggerID" : "926fb3518b1b611b53901b4b9fcc834a0c0f9da1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9bcb9cf1ff714e2c984162edc84b442c961baf13",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9bcb9cf1ff714e2c984162edc84b442c961baf13",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 926fb3518b1b611b53901b4b9fcc834a0c0f9da1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5155) 
   * 9bcb9cf1ff714e2c984162edc84b442c961baf13 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13028: [FLINK-18739] Implement MergingSharedSlotProfileRetriever

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13028:
URL: https://github.com/apache/flink/pull/13028#issuecomment-666235055


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "eb0a8e585b2aba939c4a43ba409566f3df287848",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5045",
       "triggerID" : "eb0a8e585b2aba939c4a43ba409566f3df287848",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * eb0a8e585b2aba939c4a43ba409566f3df287848 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5045) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #13028: [FLINK-18739] Implement MergingSharedSlotProfileRetriever

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #13028:
URL: https://github.com/apache/flink/pull/13028#discussion_r464247683



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Factory for {@link MergingSharedSlotProfileRetriever}.
+ */
+class MergingSharedSlotProfileRetrieverFactory implements SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory {
+	private final PreferredLocationsRetriever preferredLocationsRetriever;
+
+	private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever;
+
+	private final Function<ExecutionVertexID, AllocationID> prioAllocationIdRetriever;
+
+	MergingSharedSlotProfileRetrieverFactory(
+			PreferredLocationsRetriever preferredLocationsRetriever,
+			Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever,
+			Function<ExecutionVertexID, AllocationID> prioAllocationIdRetriever) {
+		this.preferredLocationsRetriever = Preconditions.checkNotNull(preferredLocationsRetriever);
+		this.resourceProfileRetriever = Preconditions.checkNotNull(resourceProfileRetriever);
+		this.prioAllocationIdRetriever = Preconditions.checkNotNull(prioAllocationIdRetriever);
+	}
+
+	@Override
+	public SharedSlotProfileRetriever createFromBulk(Set<ExecutionVertexID> bulk) {
+		Set<AllocationID> allPriorAllocationIds = bulk
+			.stream()
+			.map(prioAllocationIdRetriever)
+			.filter(Objects::nonNull)
+			.collect(Collectors.toSet());
+		return new MergingSharedSlotProfileRetriever(allPriorAllocationIds, bulk);
+	}
+
+	/**
+	 * Computes a merged {@link SlotProfile} of an execution slot sharing group within a bulk to schedule.
+	 */
+	private class MergingSharedSlotProfileRetriever implements SharedSlotProfileRetriever {
+		/**
+		 * All previous {@link AllocationID}s of the bulk to schedule.
+		 */
+		private final Set<AllocationID> allBulkPriorAllocationIds;
+
+		/**
+		 * All {@link ExecutionVertexID}s of the bulk.
+		 */
+		private final Set<ExecutionVertexID> producersToIgnore;
+
+		private MergingSharedSlotProfileRetriever(
+			Set<AllocationID> allBulkPriorAllocationIds,
+			Set<ExecutionVertexID> producersToIgnore) {
+			this.allBulkPriorAllocationIds = Preconditions.checkNotNull(allBulkPriorAllocationIds);
+			this.producersToIgnore = Preconditions.checkNotNull(producersToIgnore);
+		}
+
+		/**
+		 * Computes a {@link SlotProfile} of an execution slot sharing group.
+		 *
+		 * <p>The {@link ResourceProfile} of the {@link SlotProfile} is the merged {@link ResourceProfile}s
+		 * of all executions sharing the slot.
+		 *
+		 * <p>The preferred locations of the {@link SlotProfile} is a union of the preferred locations
+		 * of all executions sharing the slot. The input locations within the bulk are ignored to avoid cyclic dependencies
+		 * within the region, e.g. in case of all-to-all pipelined connections, so that the allocations do not block each other.
+		 *
+		 * <p>The preferred {@link AllocationID}s of the {@link SlotProfile} are all previous {@link AllocationID}s
+		 * of all executions sharing the slot.
+		 *
+		 * <p>The {@link SlotProfile} also refers to all previous {@link AllocationID}s
+		 * of all executions within the bulk.
+		 *
+		 * @param executionSlotSharingGroup executions sharing the slot.
+		 * @return a future of the {@link SlotProfile} to allocate for the {@code executionSlotSharingGroup}.
+		 */
+		@Override
+		public CompletableFuture<SlotProfile> getSlotProfileFuture(ExecutionSlotSharingGroup executionSlotSharingGroup) {
+			ResourceProfile totalSlotResourceProfile = ResourceProfile.ZERO;
+			Collection<AllocationID> priorAllocations = new HashSet<>();
+			Collection<CompletableFuture<Collection<TaskManagerLocation>>> preferredLocationsPerExecution = new ArrayList<>();
+			for (ExecutionVertexID execution : executionSlotSharingGroup.getExecutionVertexIds()) {
+				totalSlotResourceProfile = totalSlotResourceProfile.merge(resourceProfileRetriever.apply(execution));
+				priorAllocations.add(prioAllocationIdRetriever.apply(execution));
+				preferredLocationsPerExecution.add(preferredLocationsRetriever
+					.getPreferredLocations(execution, producersToIgnore));
+			}
+
+			CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = FutureUtils
+				.combineAll(preferredLocationsPerExecution)
+				.thenApply(executionPreferredLocations ->
+					executionPreferredLocations.stream().flatMap(Collection::stream).collect(Collectors.toSet()));

Review comment:
       I think a list is needed here instead of a set, because the frequency of a location will be accounted when selecting the best location in `LocationPreferenceSlotSelectionStrategy`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin commented on pull request #13028: [FLINK-18739] Implement MergingSharedSlotProfileRetriever

Posted by GitBox <gi...@apache.org>.
azagrebin commented on pull request #13028:
URL: https://github.com/apache/flink/pull/13028#issuecomment-668421378


   Thanks for the review @zhuzhurk
   I have addressed the comments


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin closed pull request #13028: [FLINK-18739] Implement MergingSharedSlotProfileRetriever

Posted by GitBox <gi...@apache.org>.
azagrebin closed pull request #13028:
URL: https://github.com/apache/flink/pull/13028


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13028: [FLINK-18739] Implement MergingSharedSlotProfileRetriever

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13028:
URL: https://github.com/apache/flink/pull/13028#issuecomment-666235055


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "eb0a8e585b2aba939c4a43ba409566f3df287848",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "eb0a8e585b2aba939c4a43ba409566f3df287848",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * eb0a8e585b2aba939c4a43ba409566f3df287848 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13028: [FLINK-18739] Implement MergingSharedSlotProfileRetriever

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13028:
URL: https://github.com/apache/flink/pull/13028#issuecomment-666227849


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit eb0a8e585b2aba939c4a43ba409566f3df287848 (Thu Jul 30 08:34:20 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13028: [FLINK-18739] Implement MergingSharedSlotProfileRetriever

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13028:
URL: https://github.com/apache/flink/pull/13028#issuecomment-666235055


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "eb0a8e585b2aba939c4a43ba409566f3df287848",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5045",
       "triggerID" : "eb0a8e585b2aba939c4a43ba409566f3df287848",
       "triggerType" : "PUSH"
     }, {
       "hash" : "926fb3518b1b611b53901b4b9fcc834a0c0f9da1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5155",
       "triggerID" : "926fb3518b1b611b53901b4b9fcc834a0c0f9da1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9bcb9cf1ff714e2c984162edc84b442c961baf13",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5191",
       "triggerID" : "9bcb9cf1ff714e2c984162edc84b442c961baf13",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 926fb3518b1b611b53901b4b9fcc834a0c0f9da1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5155) 
   * 9bcb9cf1ff714e2c984162edc84b442c961baf13 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5191) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13028: [FLINK-18739] Implement MergingSharedSlotProfileRetriever

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13028:
URL: https://github.com/apache/flink/pull/13028#issuecomment-666235055


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "eb0a8e585b2aba939c4a43ba409566f3df287848",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5045",
       "triggerID" : "eb0a8e585b2aba939c4a43ba409566f3df287848",
       "triggerType" : "PUSH"
     }, {
       "hash" : "926fb3518b1b611b53901b4b9fcc834a0c0f9da1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "926fb3518b1b611b53901b4b9fcc834a0c0f9da1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * eb0a8e585b2aba939c4a43ba409566f3df287848 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5045) 
   * 926fb3518b1b611b53901b4b9fcc834a0c0f9da1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org