You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/02/03 22:42:52 UTC

[flink] branch master updated: [FLINK-21204][coordination] Remove LogicalSlot#getSlotSharingGroupId

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 21a36f8  [FLINK-21204][coordination] Remove LogicalSlot#getSlotSharingGroupId
21a36f8 is described below

commit 21a36f8dc29853233ed1aac6f4d09db5f9e03401
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Jan 29 14:17:31 2021 +0100

    [FLINK-21204][coordination] Remove LogicalSlot#getSlotSharingGroupId
---
 .../org/apache/flink/runtime/jobmaster/LogicalSlot.java    |  9 ---------
 .../runtime/jobmaster/slotpool/SingleLogicalSlot.java      | 14 +-------------
 .../org/apache/flink/runtime/scheduler/SharedSlot.java     |  1 -
 .../executiongraph/ExecutionVertexLocalityTest.java        |  6 +-----
 .../apache/flink/runtime/jobmaster/TestingLogicalSlot.java | 11 -----------
 .../flink/runtime/jobmaster/TestingLogicalSlotBuilder.java |  8 --------
 .../runtime/jobmaster/slotpool/SingleLogicalSlotTest.java  |  2 +-
 .../org/apache/flink/runtime/scheduler/SharedSlotTest.java |  2 --
 8 files changed, 3 insertions(+), 50 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java
index 027c0df..5ea1999 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -135,14 +134,6 @@ public interface LogicalSlot {
      */
     SlotRequestId getSlotRequestId();
 
-    /**
-     * Gets the slot sharing group id to which this slot belongs.
-     *
-     * @return slot sharing group id of this slot or null, if none.
-     */
-    @Nullable
-    SlotSharingGroupId getSlotSharingGroupId();
-
     /** Payload for a logical slot. */
     interface Payload {
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
index f8e767b..ccf0bca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
@@ -50,7 +49,6 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
     private final SlotContext slotContext;
 
     // null if the logical slot does not belong to a slot sharing group, otherwise non-null
-    @Nullable private final SlotSharingGroupId slotSharingGroupId;
 
     // locality of this slot wrt the requested preferred locations
     private final Locality locality;
@@ -72,23 +70,20 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
     public SingleLogicalSlot(
             SlotRequestId slotRequestId,
             SlotContext slotContext,
-            @Nullable SlotSharingGroupId slotSharingGroupId,
             Locality locality,
             SlotOwner slotOwner) {
 
-        this(slotRequestId, slotContext, slotSharingGroupId, locality, slotOwner, true);
+        this(slotRequestId, slotContext, locality, slotOwner, true);
     }
 
     public SingleLogicalSlot(
             SlotRequestId slotRequestId,
             SlotContext slotContext,
-            @Nullable SlotSharingGroupId slotSharingGroupId,
             Locality locality,
             SlotOwner slotOwner,
             boolean willBeOccupiedIndefinitely) {
         this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
         this.slotContext = Preconditions.checkNotNull(slotContext);
-        this.slotSharingGroupId = slotSharingGroupId;
         this.locality = Preconditions.checkNotNull(locality);
         this.slotOwner = Preconditions.checkNotNull(slotOwner);
         this.willBeOccupiedIndefinitely = willBeOccupiedIndefinitely;
@@ -154,12 +149,6 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
         return slotRequestId;
     }
 
-    @Nullable
-    @Override
-    public SlotSharingGroupId getSlotSharingGroupId() {
-        return slotSharingGroupId;
-    }
-
     public static SingleLogicalSlot allocateFromPhysicalSlot(
             final SlotRequestId slotRequestId,
             final PhysicalSlot physicalSlot,
@@ -171,7 +160,6 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
                 new SingleLogicalSlot(
                         slotRequestId,
                         physicalSlot,
-                        null,
                         locality,
                         slotOwner,
                         slotWillBeOccupiedIndefinitely);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
index 4d18035..24f0444 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
@@ -183,7 +183,6 @@ class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
         return new SingleLogicalSlot(
                 logicalSlotRequestId,
                 physicalSlot,
-                null,
                 Locality.UNKNOWN,
                 this,
                 slotWillBeOccupiedIndefinitely);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index a2e6f23..bc5d84a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -237,11 +237,7 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 
         LogicalSlot slot =
                 new SingleLogicalSlot(
-                        new SlotRequestId(),
-                        slotContext,
-                        null,
-                        Locality.LOCAL,
-                        mock(SlotOwner.class));
+                        new SlotRequestId(), slotContext, Locality.LOCAL, mock(SlotOwner.class));
 
         if (!vertex.getCurrentExecutionAttempt().tryAssignResource(slot)) {
             throw new FlinkException("Could not assign resource.");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
index 2245c74..32e8359 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -51,8 +50,6 @@ public class TestingLogicalSlot implements LogicalSlot {
 
     private final SlotRequestId slotRequestId;
 
-    @Nullable private final SlotSharingGroupId slotSharingGroupId;
-
     private boolean released;
 
     TestingLogicalSlot(
@@ -61,7 +58,6 @@ public class TestingLogicalSlot implements LogicalSlot {
             int slotNumber,
             AllocationID allocationId,
             SlotRequestId slotRequestId,
-            @Nullable SlotSharingGroupId slotSharingGroupId,
             boolean automaticallyCompleteReleaseFuture,
             SlotOwner slotOwner) {
 
@@ -71,7 +67,6 @@ public class TestingLogicalSlot implements LogicalSlot {
         this.slotNumber = slotNumber;
         this.allocationId = Preconditions.checkNotNull(allocationId);
         this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
-        this.slotSharingGroupId = slotSharingGroupId;
         this.releaseFuture = new CompletableFuture<>();
         this.automaticallyCompleteReleaseFuture = automaticallyCompleteReleaseFuture;
         this.slotOwner = Preconditions.checkNotNull(slotOwner);
@@ -141,12 +136,6 @@ public class TestingLogicalSlot implements LogicalSlot {
         return slotRequestId;
     }
 
-    @Nullable
-    @Override
-    public SlotSharingGroupId getSlotSharingGroupId() {
-        return slotSharingGroupId;
-    }
-
     public CompletableFuture<?> getReleaseFuture() {
         return releaseFuture;
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java
index f5c0443..ac3366f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
@@ -33,7 +32,6 @@ public class TestingLogicalSlotBuilder {
     private int slotNumber = 0;
     private AllocationID allocationId = new AllocationID();
     private SlotRequestId slotRequestId = new SlotRequestId();
-    private SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
     private SlotOwner slotOwner = new DummySlotOwner();
     private boolean automaticallyCompleteReleaseFuture = true;
 
@@ -63,11 +61,6 @@ public class TestingLogicalSlotBuilder {
         return this;
     }
 
-    public TestingLogicalSlotBuilder setSlotSharingGroupId(SlotSharingGroupId slotSharingGroupId) {
-        this.slotSharingGroupId = slotSharingGroupId;
-        return this;
-    }
-
     public TestingLogicalSlotBuilder setAutomaticallyCompleteReleaseFuture(
             boolean automaticallyCompleteReleaseFuture) {
         this.automaticallyCompleteReleaseFuture = automaticallyCompleteReleaseFuture;
@@ -86,7 +79,6 @@ public class TestingLogicalSlotBuilder {
                 slotNumber,
                 allocationId,
                 slotRequestId,
-                slotSharingGroupId,
                 automaticallyCompleteReleaseFuture,
                 slotOwner);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
index 99f75c2..ce981da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
@@ -71,7 +71,7 @@ public class SingleLogicalSlotTest extends TestLogger {
 
     private SingleLogicalSlot createSingleLogicalSlot(SlotOwner slotOwner) {
         return new SingleLogicalSlot(
-                new SlotRequestId(), createSlotContext(), null, Locality.LOCAL, slotOwner);
+                new SlotRequestId(), createSlotContext(), Locality.LOCAL, slotOwner);
     }
 
     private static SlotContext createSlotContext() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java
index 885c2c9..ac633c6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java
@@ -40,7 +40,6 @@ import java.util.function.Consumer;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createRandomExecutionVertexId;
 import static org.apache.flink.runtime.scheduler.SharedSlotTestingUtils.createExecutionSlotSharingGroup;
 import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -107,7 +106,6 @@ public class SharedSlotTest extends TestLogger {
         assertThat(logicalSlot.getTaskManagerGateway(), is(taskManagerGateway));
         assertThat(logicalSlot.getPhysicalSlotNumber(), is(physicalSlotNumber));
         assertThat(logicalSlot.getLocality(), is(Locality.UNKNOWN));
-        assertThat(logicalSlot.getSlotSharingGroupId(), nullValue());
     }
 
     @Test