You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2023/07/19 11:54:58 UTC

[flink] branch master updated: [FLINK-32586][coordination] Enable input locality in SimpleExecutionSlotAllocator.

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e732edb41a4 [FLINK-32586][coordination] Enable input locality in SimpleExecutionSlotAllocator.
e732edb41a4 is described below

commit e732edb41a423f19d5eefc397ddbfacadaf0179e
Author: sunxia <xi...@gmail.com>
AuthorDate: Tue Jul 18 17:11:01 2023 +0800

    [FLINK-32586][coordination] Enable input locality in SimpleExecutionSlotAllocator.
    
    This closes #23009.
---
 .../executiongraph/SpeculativeExecutionVertex.java |  6 ----
 .../scheduler/SimpleExecutionSlotAllocator.java    | 18 +++++++++--
 .../SimpleExecutionSlotAllocatorTest.java          | 36 ++++++++++++++++++++++
 3 files changed, 52 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
index dbd5f84a461..8179d541210 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
@@ -308,10 +308,4 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
         throw new UnsupportedOperationException(
                 "Method is not supported in SpeculativeExecutionVertex.");
     }
-
-    @Override
-    public CompletableFuture<TaskManagerLocation> getCurrentTaskManagerLocationFuture() {
-        throw new UnsupportedOperationException(
-                "Method is not supported in SpeculativeExecutionVertex.");
-    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
index e5d6d8ad1e7..07251ef42cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
@@ -28,11 +28,14 @@ import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
 import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.DualKeyLinkedMap;
 import org.apache.flink.util.FlinkException;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -41,7 +44,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A simple implementation of {@link ExecutionSlotAllocator}. No support for slot sharing,
- * co-location, state/input locality, nor local recovery.
+ * co-location, nor local recovery.
  */
 public class SimpleExecutionSlotAllocator implements ExecutionSlotAllocator {
     private final PhysicalSlotProvider slotProvider;
@@ -50,6 +53,8 @@ public class SimpleExecutionSlotAllocator implements ExecutionSlotAllocator {
 
     private final Function<ExecutionAttemptID, ResourceProfile> resourceProfileRetriever;
 
+    private final SyncPreferredLocationsRetriever preferredLocationsRetriever;
+
     private final DualKeyLinkedMap<
                     ExecutionAttemptID, SlotRequestId, CompletableFuture<LogicalSlot>>
             requestedPhysicalSlots;
@@ -57,10 +62,12 @@ public class SimpleExecutionSlotAllocator implements ExecutionSlotAllocator {
     SimpleExecutionSlotAllocator(
             PhysicalSlotProvider slotProvider,
             Function<ExecutionAttemptID, ResourceProfile> resourceProfileRetriever,
+            SyncPreferredLocationsRetriever preferredLocationsRetriever,
             boolean slotWillBeOccupiedIndefinitely) {
         this.slotProvider = checkNotNull(slotProvider);
         this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
         this.resourceProfileRetriever = checkNotNull(resourceProfileRetriever);
+        this.preferredLocationsRetriever = checkNotNull(preferredLocationsRetriever);
         this.requestedPhysicalSlots = new DualKeyLinkedMap<>();
     }
 
@@ -78,11 +85,14 @@ public class SimpleExecutionSlotAllocator implements ExecutionSlotAllocator {
         }
         final SlotRequestId slotRequestId = new SlotRequestId();
         final ResourceProfile resourceProfile = resourceProfileRetriever.apply(executionAttemptId);
+        Collection<TaskManagerLocation> preferredLocations =
+                preferredLocationsRetriever.getPreferredLocations(
+                        executionAttemptId.getExecutionVertexId(), Collections.emptySet());
         final SlotProfile slotProfile =
                 SlotProfile.priorAllocation(
                         resourceProfile,
                         resourceProfile,
-                        Collections.emptyList(),
+                        preferredLocations,
                         Collections.emptyList(),
                         Collections.emptySet());
         final PhysicalSlotRequest request =
@@ -180,9 +190,13 @@ public class SimpleExecutionSlotAllocator implements ExecutionSlotAllocator {
 
         @Override
         public ExecutionSlotAllocator createInstance(ExecutionSlotAllocationContext context) {
+            SyncPreferredLocationsRetriever preferredLocationsRetriever =
+                    new DefaultSyncPreferredLocationsRetriever(
+                            executionVertexId -> Optional.empty(), context);
             return new SimpleExecutionSlotAllocator(
                     slotProvider,
                     id -> context.getResourceProfile(id.getExecutionVertexId()),
+                    preferredLocationsRetriever,
                     slotWillBeOccupiedIndefinitely);
         }
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
index 033ff2a1d26..8c8b4191f1d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
@@ -18,18 +18,26 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingPayload;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.function.BiConsumerWithException;
 
 import org.junit.jupiter.api.Test;
 
+import java.net.InetAddress;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 
@@ -232,9 +240,29 @@ class SimpleExecutionSlotAllocatorTest {
         assertThat(context.getSlotProvider().isBatchSlotRequestTimeoutCheckEnabled()).isTrue();
     }
 
+    @Test
+    void testPreferredLocationsOfSlotProfile() {
+        final AllocationContext context = new AllocationContext();
+        List<TaskManagerLocation> taskManagerLocations =
+                Collections.singletonList(
+                        new TaskManagerLocation(
+                                ResourceID.generate(), InetAddress.getLoopbackAddress(), 41));
+        context.getLocations()
+                .put(EXECUTION_ATTEMPT_ID.getExecutionVertexId(), taskManagerLocations);
+        context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+        assertThat(context.getSlotProvider().getRequests()).hasSize(1);
+        final PhysicalSlotRequest slotRequest =
+                context.getSlotProvider().getRequests().values().iterator().next();
+
+        assertThat(slotRequest.getSlotProfile().getPreferredLocations()).hasSize(1);
+        assertThat(slotRequest.getSlotProfile().getPreferredLocations())
+                .isEqualTo(taskManagerLocations);
+    }
+
     private static class AllocationContext {
         private final TestingPhysicalSlotProvider slotProvider;
         private final boolean slotWillBeOccupiedIndefinitely;
+        private final Map<ExecutionVertexID, Collection<TaskManagerLocation>> locations;
         private final SimpleExecutionSlotAllocator allocator;
 
         public AllocationContext() {
@@ -245,10 +273,14 @@ class SimpleExecutionSlotAllocatorTest {
                 TestingPhysicalSlotProvider slotProvider, boolean slotWillBeOccupiedIndefinitely) {
             this.slotProvider = slotProvider;
             this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
+            this.locations = new HashMap<>();
             this.allocator =
                     new SimpleExecutionSlotAllocator(
                             slotProvider,
                             executionAttemptId -> RESOURCE_PROFILE,
+                            (executionVertexId, producersToIgnore) ->
+                                    locations.getOrDefault(
+                                            executionVertexId, Collections.emptyList()),
                             slotWillBeOccupiedIndefinitely);
         }
 
@@ -268,6 +300,10 @@ class SimpleExecutionSlotAllocatorTest {
             return slotWillBeOccupiedIndefinitely;
         }
 
+        public Map<ExecutionVertexID, Collection<TaskManagerLocation>> getLocations() {
+            return locations;
+        }
+
         public SimpleExecutionSlotAllocator getAllocator() {
             return allocator;
         }