You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2023/07/19 11:54:58 UTC
[flink] branch master updated: [FLINK-32586][coordination] Enable input locality in SimpleExecutionSlotAllocator.
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e732edb41a4 [FLINK-32586][coordination] Enable input locality in SimpleExecutionSlotAllocator.
e732edb41a4 is described below
commit e732edb41a423f19d5eefc397ddbfacadaf0179e
Author: sunxia <xi...@gmail.com>
AuthorDate: Tue Jul 18 17:11:01 2023 +0800
[FLINK-32586][coordination] Enable input locality in SimpleExecutionSlotAllocator.
This closes #23009.
---
.../executiongraph/SpeculativeExecutionVertex.java | 6 ----
.../scheduler/SimpleExecutionSlotAllocator.java | 18 +++++++++--
.../SimpleExecutionSlotAllocatorTest.java | 36 ++++++++++++++++++++++
3 files changed, 52 insertions(+), 8 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
index dbd5f84a461..8179d541210 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
@@ -308,10 +308,4 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
throw new UnsupportedOperationException(
"Method is not supported in SpeculativeExecutionVertex.");
}
-
- @Override
- public CompletableFuture<TaskManagerLocation> getCurrentTaskManagerLocationFuture() {
- throw new UnsupportedOperationException(
- "Method is not supported in SpeculativeExecutionVertex.");
- }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
index e5d6d8ad1e7..07251ef42cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
@@ -28,11 +28,14 @@ import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.DualKeyLinkedMap;
import org.apache.flink.util.FlinkException;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -41,7 +44,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A simple implementation of {@link ExecutionSlotAllocator}. No support for slot sharing,
- * co-location, state/input locality, nor local recovery.
+ * co-location, nor local recovery.
*/
public class SimpleExecutionSlotAllocator implements ExecutionSlotAllocator {
private final PhysicalSlotProvider slotProvider;
@@ -50,6 +53,8 @@ public class SimpleExecutionSlotAllocator implements ExecutionSlotAllocator {
private final Function<ExecutionAttemptID, ResourceProfile> resourceProfileRetriever;
+ private final SyncPreferredLocationsRetriever preferredLocationsRetriever;
+
private final DualKeyLinkedMap<
ExecutionAttemptID, SlotRequestId, CompletableFuture<LogicalSlot>>
requestedPhysicalSlots;
@@ -57,10 +62,12 @@ public class SimpleExecutionSlotAllocator implements ExecutionSlotAllocator {
SimpleExecutionSlotAllocator(
PhysicalSlotProvider slotProvider,
Function<ExecutionAttemptID, ResourceProfile> resourceProfileRetriever,
+ SyncPreferredLocationsRetriever preferredLocationsRetriever,
boolean slotWillBeOccupiedIndefinitely) {
this.slotProvider = checkNotNull(slotProvider);
this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
this.resourceProfileRetriever = checkNotNull(resourceProfileRetriever);
+ this.preferredLocationsRetriever = checkNotNull(preferredLocationsRetriever);
this.requestedPhysicalSlots = new DualKeyLinkedMap<>();
}
@@ -78,11 +85,14 @@ public class SimpleExecutionSlotAllocator implements ExecutionSlotAllocator {
}
final SlotRequestId slotRequestId = new SlotRequestId();
final ResourceProfile resourceProfile = resourceProfileRetriever.apply(executionAttemptId);
+ Collection<TaskManagerLocation> preferredLocations =
+ preferredLocationsRetriever.getPreferredLocations(
+ executionAttemptId.getExecutionVertexId(), Collections.emptySet());
final SlotProfile slotProfile =
SlotProfile.priorAllocation(
resourceProfile,
resourceProfile,
- Collections.emptyList(),
+ preferredLocations,
Collections.emptyList(),
Collections.emptySet());
final PhysicalSlotRequest request =
@@ -180,9 +190,13 @@ public class SimpleExecutionSlotAllocator implements ExecutionSlotAllocator {
@Override
public ExecutionSlotAllocator createInstance(ExecutionSlotAllocationContext context) {
+ SyncPreferredLocationsRetriever preferredLocationsRetriever =
+ new DefaultSyncPreferredLocationsRetriever(
+ executionVertexId -> Optional.empty(), context);
return new SimpleExecutionSlotAllocator(
slotProvider,
id -> context.getResourceProfile(id.getExecutionVertexId()),
+ preferredLocationsRetriever,
slotWillBeOccupiedIndefinitely);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
index 033ff2a1d26..8c8b4191f1d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
@@ -18,18 +18,26 @@
package org.apache.flink.runtime.scheduler;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingPayload;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.BiConsumerWithException;
import org.junit.jupiter.api.Test;
+import java.net.InetAddress;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
@@ -232,9 +240,29 @@ class SimpleExecutionSlotAllocatorTest {
assertThat(context.getSlotProvider().isBatchSlotRequestTimeoutCheckEnabled()).isTrue();
}
+ @Test
+ void testPreferredLocationsOfSlotProfile() {
+ final AllocationContext context = new AllocationContext();
+ List<TaskManagerLocation> taskManagerLocations =
+ Collections.singletonList(
+ new TaskManagerLocation(
+ ResourceID.generate(), InetAddress.getLoopbackAddress(), 41));
+ context.getLocations()
+ .put(EXECUTION_ATTEMPT_ID.getExecutionVertexId(), taskManagerLocations);
+ context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+ assertThat(context.getSlotProvider().getRequests()).hasSize(1);
+ final PhysicalSlotRequest slotRequest =
+ context.getSlotProvider().getRequests().values().iterator().next();
+
+ assertThat(slotRequest.getSlotProfile().getPreferredLocations()).hasSize(1);
+ assertThat(slotRequest.getSlotProfile().getPreferredLocations())
+ .isEqualTo(taskManagerLocations);
+ }
+
private static class AllocationContext {
private final TestingPhysicalSlotProvider slotProvider;
private final boolean slotWillBeOccupiedIndefinitely;
+ private final Map<ExecutionVertexID, Collection<TaskManagerLocation>> locations;
private final SimpleExecutionSlotAllocator allocator;
public AllocationContext() {
@@ -245,10 +273,14 @@ class SimpleExecutionSlotAllocatorTest {
TestingPhysicalSlotProvider slotProvider, boolean slotWillBeOccupiedIndefinitely) {
this.slotProvider = slotProvider;
this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
+ this.locations = new HashMap<>();
this.allocator =
new SimpleExecutionSlotAllocator(
slotProvider,
executionAttemptId -> RESOURCE_PROFILE,
+ (executionVertexId, producersToIgnore) ->
+ locations.getOrDefault(
+ executionVertexId, Collections.emptyList()),
slotWillBeOccupiedIndefinitely);
}
@@ -268,6 +300,10 @@ class SimpleExecutionSlotAllocatorTest {
return slotWillBeOccupiedIndefinitely;
}
+ public Map<ExecutionVertexID, Collection<TaskManagerLocation>> getLocations() {
+ return locations;
+ }
+
public SimpleExecutionSlotAllocator getAllocator() {
return allocator;
}